aboutsummaryrefslogtreecommitdiff
path: root/R/pkg
diff options
context:
space:
mode:
authorSun Rui <rui.sun@intel.com>2016-05-05 18:49:43 -0700
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2016-05-05 18:49:43 -0700
commit157a49aa410dc1870cd171148d317084c5a90d23 (patch)
treec4d4d81b171c24308c70c8289351c0e7c497ff98 /R/pkg
parent7f5922aa4a810a0b9cc783956a8b7aa3dad86a0a (diff)
downloadspark-157a49aa410dc1870cd171148d317084c5a90d23.tar.gz
spark-157a49aa410dc1870cd171148d317084c5a90d23.tar.bz2
spark-157a49aa410dc1870cd171148d317084c5a90d23.zip
[SPARK-11395][SPARKR] Support over and window specification in SparkR.
This PR: 1. Implement WindowSpec S4 class. 2. Implement Window.partitionBy() and Window.orderBy() as utility functions to create WindowSpec objects. 3. Implement over() of Column class. Author: Sun Rui <rui.sun@intel.com> Author: Sun Rui <sunrui2016@gmail.com> Closes #10094 from sun-rui/SPARK-11395.
Diffstat (limited to 'R/pkg')
-rw-r--r--R/pkg/DESCRIPTION2
-rw-r--r--R/pkg/NAMESPACE10
-rw-r--r--R/pkg/R/DataFrame.R4
-rw-r--r--R/pkg/R/WindowSpec.R188
-rw-r--r--R/pkg/R/generics.R29
-rw-r--r--R/pkg/R/pairRDD.R4
-rw-r--r--R/pkg/R/window.R98
-rw-r--r--R/pkg/inst/tests/testthat/test_sparkSQL.R36
8 files changed, 364 insertions, 7 deletions
diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION
index 7179438efc..963a1bb580 100644
--- a/R/pkg/DESCRIPTION
+++ b/R/pkg/DESCRIPTION
@@ -26,6 +26,7 @@ Collate:
'pairRDD.R'
'DataFrame.R'
'SQLContext.R'
+ 'WindowSpec.R'
'backend.R'
'broadcast.R'
'client.R'
@@ -38,4 +39,5 @@ Collate:
'stats.R'
'types.R'
'utils.R'
+ 'window.R'
RoxygenNote: 5.0.1
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 73f7c595f4..1432ab8a9d 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -216,6 +216,7 @@ exportMethods("%in%",
"next_day",
"ntile",
"otherwise",
+ "over",
"percent_rank",
"pmod",
"quarter",
@@ -315,3 +316,12 @@ export("structField",
"structType.jobj",
"structType.structField",
"print.structType")
+
+exportClasses("WindowSpec")
+
+export("partitionBy",
+ "rowsBetween",
+ "rangeBetween")
+
+export("window.partitionBy",
+ "window.orderBy")
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index fcf473ac7b..43c46b8474 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -1749,8 +1749,8 @@ setMethod("arrange",
#' @export
setMethod("orderBy",
signature(x = "SparkDataFrame", col = "characterOrColumn"),
- function(x, col) {
- arrange(x, col)
+ function(x, col, ...) {
+ arrange(x, col, ...)
})
#' Filter
diff --git a/R/pkg/R/WindowSpec.R b/R/pkg/R/WindowSpec.R
new file mode 100644
index 0000000000..581176a6c0
--- /dev/null
+++ b/R/pkg/R/WindowSpec.R
@@ -0,0 +1,188 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# WindowSpec.R - WindowSpec class and methods implemented in S4 OO classes
+
+#' @include generics.R jobj.R column.R
+NULL
+
+#' @title S4 class that represents a WindowSpec
+#' @description WindowSpec can be created by using window.partitionBy()
+#' or window.orderBy()
+#' @rdname WindowSpec
+#' @seealso \link{window.partitionBy}, \link{window.orderBy}
+#'
+#' @param sws A Java object reference to the backing Scala WindowSpec
+#' @export
+setClass("WindowSpec",
+ slots = list(sws = "jobj"))
+
+setMethod("initialize", "WindowSpec", function(.Object, sws) {
+ .Object@sws <- sws
+ .Object
+})
+
+windowSpec <- function(sws) {
+ stopifnot(class(sws) == "jobj")
+ new("WindowSpec", sws)
+}
+
+#' @rdname show
+setMethod("show", "WindowSpec",
+ function(object) {
+ cat("WindowSpec", callJMethod(object@sws, "toString"), "\n")
+ })
+
+#' partitionBy
+#'
+#' Defines the partitioning columns in a WindowSpec.
+#'
+#' @param x a WindowSpec
+#' @return a WindowSpec
+#' @rdname partitionBy
+#' @name partitionBy
+#' @family windowspec_method
+#' @export
+#' @examples
+#' \dontrun{
+#' partitionBy(ws, "col1", "col2")
+#' partitionBy(ws, df$col1, df$col2)
+#' }
+setMethod("partitionBy",
+ signature(x = "WindowSpec"),
+ function(x, col, ...) {
+ stopifnot (class(col) %in% c("character", "Column"))
+
+ if (class(col) == "character") {
+ windowSpec(callJMethod(x@sws, "partitionBy", col, list(...)))
+ } else {
+ jcols <- lapply(list(col, ...), function(c) {
+ c@jc
+ })
+ windowSpec(callJMethod(x@sws, "partitionBy", jcols))
+ }
+ })
+
+#' orderBy
+#'
+#' Defines the ordering columns in a WindowSpec.
+#'
+#' @param x a WindowSpec
+#' @return a WindowSpec
+#' @rdname arrange
+#' @name orderBy
+#' @family windowspec_method
+#' @export
+#' @examples
+#' \dontrun{
+#' orderBy(ws, "col1", "col2")
+#' orderBy(ws, df$col1, df$col2)
+#' }
+setMethod("orderBy",
+ signature(x = "WindowSpec", col = "character"),
+ function(x, col, ...) {
+ windowSpec(callJMethod(x@sws, "orderBy", col, list(...)))
+ })
+
+#' @rdname arrange
+#' @name orderBy
+#' @export
+setMethod("orderBy",
+ signature(x = "WindowSpec", col = "Column"),
+ function(x, col, ...) {
+ jcols <- lapply(list(col, ...), function(c) {
+ c@jc
+ })
+ windowSpec(callJMethod(x@sws, "orderBy", jcols))
+ })
+
+#' rowsBetween
+#'
+#' Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive).
+#'
+#' Both `start` and `end` are relative positions from the current row. For example, "0" means
+#' "current row", while "-1" means the row before the current row, and "5" means the fifth row
+#' after the current row.
+#'
+#' @param x a WindowSpec
+#' @param start boundary start, inclusive.
+#' The frame is unbounded if this is the minimum long value.
+#' @param end boundary end, inclusive.
+#' The frame is unbounded if this is the maximum long value.
+#' @return a WindowSpec
+#' @rdname rowsBetween
+#' @name rowsBetween
+#' @family windowspec_method
+#' @export
+#' @examples
+#' \dontrun{
+#' rowsBetween(ws, 0, 3)
+#' }
+setMethod("rowsBetween",
+ signature(x = "WindowSpec", start = "numeric", end = "numeric"),
+ function(x, start, end) {
+ # "start" and "end" should be long, due to serde limitation,
+ # limit "start" and "end" as integer now
+ windowSpec(callJMethod(x@sws, "rowsBetween", as.integer(start), as.integer(end)))
+ })
+
+#' rangeBetween
+#'
+#' Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive).
+#'
+#' Both `start` and `end` are relative from the current row. For example, "0" means "current row",
+#' while "-1" means one off before the current row, and "5" means the five off after the
+#' current row.
+
+#' @param x a WindowSpec
+#' @param start boundary start, inclusive.
+#' The frame is unbounded if this is the minimum long value.
+#' @param end boundary end, inclusive.
+#' The frame is unbounded if this is the maximum long value.
+#' @return a WindowSpec
+#' @rdname rangeBetween
+#' @name rangeBetween
+#' @family windowspec_method
+#' @export
+#' @examples
+#' \dontrun{
+#' rangeBetween(ws, 0, 3)
+#' }
+setMethod("rangeBetween",
+ signature(x = "WindowSpec", start = "numeric", end = "numeric"),
+ function(x, start, end) {
+ # "start" and "end" should be long, due to serde limitation,
+ # limit "start" and "end" as integer now
+ windowSpec(callJMethod(x@sws, "rangeBetween", as.integer(start), as.integer(end)))
+ })
+
+# Note that over is a method of Column class, but it is placed here to
+# avoid Roxygen circular-dependency between class Column and WindowSpec.
+
+#' over
+#'
+#' Define a windowing column.
+#'
+#' @rdname over
+#' @name over
+#' @family colum_func
+#' @export
+setMethod("over",
+ signature(x = "Column", window = "WindowSpec"),
+ function(x, window) {
+ column(callJMethod(x@jc, "over", window@sws))
+ })
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 3db1ac0766..8563be1e64 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -339,9 +339,9 @@ setGeneric("join", function(x, y, ...) { standardGeneric("join") })
# @export
setGeneric("leftOuterJoin", function(x, y, numPartitions) { standardGeneric("leftOuterJoin") })
-# @rdname partitionBy
-# @export
-setGeneric("partitionBy", function(x, numPartitions, ...) { standardGeneric("partitionBy") })
+#' @rdname partitionBy
+#' @export
+setGeneric("partitionBy", function(x, ...) { standardGeneric("partitionBy") })
# @rdname reduceByKey
# @seealso groupByKey
@@ -533,7 +533,7 @@ setGeneric("mutate", function(.data, ...) {standardGeneric("mutate") })
#' @rdname arrange
#' @export
-setGeneric("orderBy", function(x, col) { standardGeneric("orderBy") })
+setGeneric("orderBy", function(x, col, ...) { standardGeneric("orderBy") })
#' @rdname schema
#' @export
@@ -733,6 +733,27 @@ setGeneric("when", function(condition, value) { standardGeneric("when") })
#' @export
setGeneric("otherwise", function(x, value) { standardGeneric("otherwise") })
+#' @rdname over
+#' @export
+setGeneric("over", function(x, window) { standardGeneric("over") })
+
+###################### WindowSpec Methods ##########################
+
+#' @rdname rowsBetween
+#' @export
+setGeneric("rowsBetween", function(x, start, end) { standardGeneric("rowsBetween") })
+
+#' @rdname rangeBetween
+#' @export
+setGeneric("rangeBetween", function(x, start, end) { standardGeneric("rangeBetween") })
+
+#' @rdname window.partitionBy
+#' @export
+setGeneric("window.partitionBy", function(col, ...) { standardGeneric("window.partitionBy") })
+
+#' @rdname window.orderBy
+#' @export
+setGeneric("window.orderBy", function(col, ...) { standardGeneric("window.orderBy") })
###################### Expression Function Methods ##########################
diff --git a/R/pkg/R/pairRDD.R b/R/pkg/R/pairRDD.R
index 4075ef4377..d39775cabe 100644
--- a/R/pkg/R/pairRDD.R
+++ b/R/pkg/R/pairRDD.R
@@ -205,8 +205,10 @@ setMethod("flatMapValues",
#' @aliases partitionBy,RDD,integer-method
#' @noRd
setMethod("partitionBy",
- signature(x = "RDD", numPartitions = "numeric"),
+ signature(x = "RDD"),
function(x, numPartitions, partitionFunc = hashCode) {
+ stopifnot(is.numeric(numPartitions))
+
partitionFunc <- cleanClosure(partitionFunc)
serializedHashFuncBytes <- serialize(partitionFunc, connection = NULL)
diff --git a/R/pkg/R/window.R b/R/pkg/R/window.R
new file mode 100644
index 0000000000..7ecf70abc6
--- /dev/null
+++ b/R/pkg/R/window.R
@@ -0,0 +1,98 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# window.R - Utility functions for defining window in DataFrames
+
+#' window.partitionBy
+#'
+#' Creates a WindowSpec with the partitioning defined.
+#'
+#' @rdname window.partitionBy
+#' @name window.partitionBy
+#' @export
+#' @examples
+#' \dontrun{
+#' ws <- window.partitionBy("key1", "key2")
+#' df1 <- select(df, over(lead("value", 1), ws))
+#'
+#' ws <- window.partitionBy(df$key1, df$key2)
+#' df1 <- select(df, over(lead("value", 1), ws))
+#' }
+setMethod("window.partitionBy",
+ signature(col = "character"),
+ function(col, ...) {
+ windowSpec(
+ callJStatic("org.apache.spark.sql.expressions.Window",
+ "partitionBy",
+ col,
+ list(...)))
+ })
+
+#' @rdname window.partitionBy
+#' @name window.partitionBy
+#' @export
+setMethod("window.partitionBy",
+ signature(col = "Column"),
+ function(col, ...) {
+ jcols <- lapply(list(col, ...), function(c) {
+ c@jc
+ })
+ windowSpec(
+ callJStatic("org.apache.spark.sql.expressions.Window",
+ "partitionBy",
+ jcols))
+ })
+
+#' window.orderBy
+#'
+#' Creates a WindowSpec with the ordering defined.
+#'
+#' @rdname window.orderBy
+#' @name window.orderBy
+#' @export
+#' @examples
+#' \dontrun{
+#' ws <- window.orderBy("key1", "key2")
+#' df1 <- select(df, over(lead("value", 1), ws))
+#'
+#' ws <- window.orderBy(df$key1, df$key2)
+#' df1 <- select(df, over(lead("value", 1), ws))
+#' }
+setMethod("window.orderBy",
+ signature(col = "character"),
+ function(col, ...) {
+ windowSpec(
+ callJStatic("org.apache.spark.sql.expressions.Window",
+ "orderBy",
+ col,
+ list(...)))
+ })
+
+#' @rdname window.orderBy
+#' @name window.orderBy
+#' @export
+setMethod("window.orderBy",
+ signature(col = "Column"),
+ function(col, ...) {
+ jcols <- lapply(list(col, ...), function(c) {
+ c@jc
+ })
+ windowSpec(
+ callJStatic("org.apache.spark.sql.expressions.Window",
+ "orderBy",
+ jcols))
+ })
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 3b6a27c3b8..0f67bc2e33 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -2118,6 +2118,42 @@ test_that("repartition by columns on DataFrame", {
expect_equal(nrow(df1), 2)
})
+test_that("Window functions on a DataFrame", {
+ ssc <- callJMethod(sc, "sc")
+ hiveCtx <- tryCatch({
+ newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc)
+ },
+ error = function(err) {
+ skip("Hive is not build with SparkSQL, skipped")
+ })
+
+ df <- createDataFrame(hiveCtx,
+ list(list(1L, "1"), list(2L, "2"), list(1L, "1"), list(2L, "2")),
+ schema = c("key", "value"))
+ ws <- orderBy(window.partitionBy("key"), "value")
+ result <- collect(select(df, over(lead("key", 1), ws), over(lead("value", 1), ws)))
+ names(result) <- c("key", "value")
+ expected <- data.frame(key = c(1L, NA, 2L, NA),
+ value = c("1", NA, "2", NA),
+ stringsAsFactors = FALSE)
+ expect_equal(result, expected)
+
+ ws <- orderBy(window.partitionBy(df$key), df$value)
+ result <- collect(select(df, over(lead("key", 1), ws), over(lead("value", 1), ws)))
+ names(result) <- c("key", "value")
+ expect_equal(result, expected)
+
+ ws <- partitionBy(window.orderBy("value"), "key")
+ result <- collect(select(df, over(lead("key", 1), ws), over(lead("value", 1), ws)))
+ names(result) <- c("key", "value")
+ expect_equal(result, expected)
+
+ ws <- partitionBy(window.orderBy(df$value), df$key)
+ result <- collect(select(df, over(lead("key", 1), ws), over(lead("value", 1), ws)))
+ names(result) <- c("key", "value")
+ expect_equal(result, expected)
+})
+
unlink(parquetPath)
unlink(jsonPath)
unlink(jsonPathNa)