aboutsummaryrefslogtreecommitdiff
path: root/R
diff options
context:
space:
mode:
authorNarine Kokhlikyan <narine@slice.com>2016-07-01 13:55:13 -0700
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2016-07-01 13:55:13 -0700
commit26afb4ce4099e7942f8db1ead3817ed8fbf71ce3 (patch)
treea43b0b4dfa9278f8d4f5492b40cfde3c6922c16b /R
parentc55397652ad1c6d047a8b8eb7fd92a8a1dc66306 (diff)
downloadspark-26afb4ce4099e7942f8db1ead3817ed8fbf71ce3.tar.gz
spark-26afb4ce4099e7942f8db1ead3817ed8fbf71ce3.tar.bz2
spark-26afb4ce4099e7942f8db1ead3817ed8fbf71ce3.zip
[SPARK-16012][SPARKR] Implement gapplyCollect which will apply a R function on each group similar to gapply and collect the result back to R data.frame
## What changes were proposed in this pull request? gapplyCollect() does gapply() on a SparkDataFrame and collect the result back to R. Compared to gapply() + collect(), gapplyCollect() offers performance optimization as well as programming convenience, as no schema is needed to be provided. This is similar to dapplyCollect(). ## How was this patch tested? Added test cases for gapplyCollect similar to dapplyCollect Author: Narine Kokhlikyan <narine@slice.com> Closes #13760 from NarineK/gapplyCollect.
Diffstat (limited to 'R')
-rw-r--r--R/pkg/NAMESPACE1
-rw-r--r--R/pkg/R/DataFrame.R111
-rw-r--r--R/pkg/R/generics.R4
-rw-r--r--R/pkg/R/group.R93
-rw-r--r--R/pkg/inst/tests/testthat/test_sparkSQL.R35
5 files changed, 177 insertions, 67 deletions
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index abc65887bc..bc3aceba22 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -69,6 +69,7 @@ exportMethods("arrange",
"first",
"freqItems",
"gapply",
+ "gapplyCollect",
"group_by",
"groupBy",
"head",
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 25327be6ef..5944bbc765 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -1344,7 +1344,7 @@ setMethod("dapplyCollect",
#' gapply
#'
-#' Group the SparkDataFrame using the specified columns and apply the R function to each
+#' Groups the SparkDataFrame using the specified columns and applies the R function to each
#' group.
#'
#' @param x A SparkDataFrame
@@ -1356,9 +1356,11 @@ setMethod("dapplyCollect",
#' @param schema The schema of the resulting SparkDataFrame after the function is applied.
#' The schema must match to output of `func`. It has to be defined for each
#' output column with preferred output column name and corresponding data type.
+#' @return a SparkDataFrame
#' @family SparkDataFrame functions
#' @rdname gapply
#' @name gapply
+#' @seealso \link{gapplyCollect}
#' @export
#' @examples
#'
@@ -1374,14 +1376,22 @@ setMethod("dapplyCollect",
#' columns with data types integer and string and the mean which is a double.
#' schema <- structType(structField("a", "integer"), structField("c", "string"),
#' structField("avg", "double"))
-#' df1 <- gapply(
+#' result <- gapply(
#' df,
-#' list("a", "c"),
+#' c("a", "c"),
#' function(key, x) {
#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
-#' },
-#' schema)
-#' collect(df1)
+#' }, schema)
+#'
+#' We can also group the data and afterwards call gapply on GroupedData.
+#' For Example:
+#' gdf <- group_by(df, "a", "c")
+#' result <- gapply(
+#' gdf,
+#' function(key, x) {
+#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
+#' }, schema)
+#' collect(result)
#'
#' Result
#' ------
@@ -1399,7 +1409,7 @@ setMethod("dapplyCollect",
#' structField("Petal_Width", "double"))
#' df1 <- gapply(
#' df,
-#' list(df$"Species"),
+#' df$"Species",
#' function(key, x) {
#' m <- suppressWarnings(lm(Sepal_Length ~
#' Sepal_Width + Petal_Length + Petal_Width, x))
@@ -1407,8 +1417,8 @@ setMethod("dapplyCollect",
#' }, schema)
#' collect(df1)
#'
-#'Result
-#'---------
+#' Result
+#' ---------
#' Model (Intercept) Sepal_Width Petal_Length Petal_Width
#' 1 0.699883 0.3303370 0.9455356 -0.1697527
#' 2 1.895540 0.3868576 0.9083370 -0.6792238
@@ -1423,6 +1433,89 @@ setMethod("gapply",
gapply(grouped, func, schema)
})
+#' gapplyCollect
+#'
+#' Groups the SparkDataFrame using the specified columns, applies the R function to each
+#' group and collects the result back to R as data.frame.
+#'
+#' @param x A SparkDataFrame
+#' @param cols Grouping columns
+#' @param func A function to be applied to each group partition specified by grouping
+#' column of the SparkDataFrame. The function `func` takes as argument
+#' a key - grouping columns and a data frame - a local R data.frame.
+#' The output of `func` is a local R data.frame.
+#' @return a data.frame
+#' @family SparkDataFrame functions
+#' @rdname gapplyCollect
+#' @name gapplyCollect
+#' @seealso \link{gapply}
+#' @export
+#' @examples
+#'
+#' \dontrun{
+#' Computes the arithmetic mean of the second column by grouping
+#' on the first and third columns. Output the grouping values and the average.
+#'
+#' df <- createDataFrame (
+#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
+#' c("a", "b", "c", "d"))
+#'
+#' result <- gapplyCollect(
+#' df,
+#' c("a", "c"),
+#' function(key, x) {
+#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
+#' colnames(y) <- c("key_a", "key_c", "mean_b")
+#' y
+#' })
+#'
+#' We can also group the data and afterwards call gapply on GroupedData.
+#' For Example:
+#' gdf <- group_by(df, "a", "c")
+#' result <- gapplyCollect(
+#' gdf,
+#' function(key, x) {
+#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
+#' colnames(y) <- c("key_a", "key_c", "mean_b")
+#' y
+#' })
+#'
+#' Result
+#' ------
+#' key_a key_c mean_b
+#' 3 3 3.0
+#' 1 1 1.5
+#'
+#' Fits linear models on iris dataset by grouping on the 'Species' column and
+#' using 'Sepal_Length' as a target variable, 'Sepal_Width', 'Petal_Length'
+#' and 'Petal_Width' as training features.
+#'
+#' df <- createDataFrame (iris)
+#' result <- gapplyCollect(
+#' df,
+#' df$"Species",
+#' function(key, x) {
+#' m <- suppressWarnings(lm(Sepal_Length ~
+#' Sepal_Width + Petal_Length + Petal_Width, x))
+#' data.frame(t(coef(m)))
+#' })
+#'
+#' Result
+#'---------
+#' Model X.Intercept. Sepal_Width Petal_Length Petal_Width
+#' 1 0.699883 0.3303370 0.9455356 -0.1697527
+#' 2 1.895540 0.3868576 0.9083370 -0.6792238
+#' 3 2.351890 0.6548350 0.2375602 0.2521257
+#'
+#'}
+#' @note gapplyCollect(SparkDataFrame) since 2.0.0
+setMethod("gapplyCollect",
+ signature(x = "SparkDataFrame"),
+ function(x, cols, func) {
+ grouped <- do.call("groupBy", c(x, cols))
+ gapplyCollect(grouped, func)
+ })
+
############################## RDD Map Functions ##################################
# All of the following functions mirror the existing RDD map functions, #
# but allow for use with DataFrames by first converting to an RRDD before calling #
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index d9080b6b32..e4ec508795 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -469,6 +469,10 @@ setGeneric("dapplyCollect", function(x, func) { standardGeneric("dapplyCollect")
#' @export
setGeneric("gapply", function(x, ...) { standardGeneric("gapply") })
+#' @rdname gapplyCollect
+#' @export
+setGeneric("gapplyCollect", function(x, ...) { standardGeneric("gapplyCollect") })
+
#' @rdname summary
#' @export
setGeneric("describe", function(x, col, ...) { standardGeneric("describe") })
diff --git a/R/pkg/R/group.R b/R/pkg/R/group.R
index 0687f14adf..5ed7e8abb4 100644
--- a/R/pkg/R/group.R
+++ b/R/pkg/R/group.R
@@ -196,64 +196,51 @@ createMethods()
#' gapply
#'
-#' Applies a R function to each group in the input GroupedData
-#'
-#' @param x a GroupedData
-#' @param func A function to be applied to each group partition specified by GroupedData.
-#' The function `func` takes as argument a key - grouping columns and
-#' a data frame - a local R data.frame.
-#' The output of `func` is a local R data.frame.
-#' @param schema The schema of the resulting SparkDataFrame after the function is applied.
-#' The schema must match to output of `func`. It has to be defined for each
-#' output column with preferred output column name and corresponding data type.
-#' @return a SparkDataFrame
+#' @param x A GroupedData
#' @rdname gapply
#' @name gapply
#' @export
-#' @examples
-#' \dontrun{
-#' Computes the arithmetic mean of the second column by grouping
-#' on the first and third columns. Output the grouping values and the average.
-#'
-#' df <- createDataFrame (
-#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
-#' c("a", "b", "c", "d"))
-#'
-#' Here our output contains three columns, the key which is a combination of two
-#' columns with data types integer and string and the mean which is a double.
-#' schema <- structType(structField("a", "integer"), structField("c", "string"),
-#' structField("avg", "double"))
-#' df1 <- gapply(
-#' df,
-#' list("a", "c"),
-#' function(key, x) {
-#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
-#' },
-#' schema)
-#' collect(df1)
-#'
-#' Result
-#' ------
-#' a c avg
-#' 3 3 3.0
-#' 1 1 1.5
-#' }
#' @note gapply(GroupedData) since 2.0.0
setMethod("gapply",
signature(x = "GroupedData"),
function(x, func, schema) {
- try(if (is.null(schema)) stop("schema cannot be NULL"))
- packageNamesArr <- serialize(.sparkREnv[[".packages"]],
- connection = NULL)
- broadcastArr <- lapply(ls(.broadcastNames),
- function(name) { get(name, .broadcastNames) })
- sdf <- callJStatic(
- "org.apache.spark.sql.api.r.SQLUtils",
- "gapply",
- x@sgd,
- serialize(cleanClosure(func), connection = NULL),
- packageNamesArr,
- broadcastArr,
- schema$jobj)
- dataFrame(sdf)
+ if (is.null(schema)) stop("schema cannot be NULL")
+ gapplyInternal(x, func, schema)
+ })
+
+#' gapplyCollect
+#'
+#' @param x A GroupedData
+#' @rdname gapplyCollect
+#' @name gapplyCollect
+#' @export
+#' @note gapplyCollect(GroupedData) since 2.0.0
+setMethod("gapplyCollect",
+ signature(x = "GroupedData"),
+ function(x, func) {
+ gdf <- gapplyInternal(x, func, NULL)
+ content <- callJMethod(gdf@sdf, "collect")
+ # content is a list of items of struct type. Each item has a single field
+ # which is a serialized data.frame corresponds to one group of the
+ # SparkDataFrame.
+ ldfs <- lapply(content, function(x) { unserialize(x[[1]]) })
+ ldf <- do.call(rbind, ldfs)
+ row.names(ldf) <- NULL
+ ldf
})
+
+gapplyInternal <- function(x, func, schema) {
+ packageNamesArr <- serialize(.sparkREnv[[".packages"]],
+ connection = NULL)
+ broadcastArr <- lapply(ls(.broadcastNames),
+ function(name) { get(name, .broadcastNames) })
+ sdf <- callJStatic(
+ "org.apache.spark.sql.api.r.SQLUtils",
+ "gapply",
+ x@sgd,
+ serialize(cleanClosure(func), connection = NULL),
+ packageNamesArr,
+ broadcastArr,
+ if (class(schema) == "structType") { schema$jobj } else { NULL })
+ dataFrame(sdf)
+} \ No newline at end of file
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 588c217f3c..3b8d5707f4 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -2257,21 +2257,24 @@ test_that("repartition by columns on DataFrame", {
expect_equal(nrow(df1), 2)
})
-test_that("gapply() on a DataFrame", {
+test_that("gapply() and gapplyCollect() on a DataFrame", {
df <- createDataFrame (
list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
c("a", "b", "c", "d"))
expected <- collect(df)
- df1 <- gapply(df, list("a"), function(key, x) { x }, schema(df))
+ df1 <- gapply(df, "a", function(key, x) { x }, schema(df))
actual <- collect(df1)
expect_identical(actual, expected)
+ df1Collect <- gapplyCollect(df, list("a"), function(key, x) { x })
+ expect_identical(df1Collect, expected)
+
# Computes the sum of second column by grouping on the first and third columns
# and checks if the sum is larger than 2
schema <- structType(structField("a", "integer"), structField("e", "boolean"))
df2 <- gapply(
df,
- list(df$"a", df$"c"),
+ c(df$"a", df$"c"),
function(key, x) {
y <- data.frame(key[1], sum(x$b) > 2)
},
@@ -2280,13 +2283,24 @@ test_that("gapply() on a DataFrame", {
expected <- c(TRUE, TRUE)
expect_identical(actual, expected)
+ df2Collect <- gapplyCollect(
+ df,
+ c(df$"a", df$"c"),
+ function(key, x) {
+ y <- data.frame(key[1], sum(x$b) > 2)
+ colnames(y) <- c("a", "e")
+ y
+ })
+ actual <- df2Collect$e
+ expect_identical(actual, expected)
+
# Computes the arithmetic mean of the second column by grouping
# on the first and third columns. Output the groupping value and the average.
schema <- structType(structField("a", "integer"), structField("c", "string"),
structField("avg", "double"))
df3 <- gapply(
df,
- list("a", "c"),
+ c("a", "c"),
function(key, x) {
y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
},
@@ -2301,11 +2315,22 @@ test_that("gapply() on a DataFrame", {
rownames(expected) <- NULL
expect_identical(actual, expected)
+ df3Collect <- gapplyCollect(
+ df,
+ c("a", "c"),
+ function(key, x) {
+ y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
+ colnames(y) <- c("a", "c", "avg")
+ y
+ })
+ actual <- df3Collect[order(df3Collect$a), ]
+ expect_identical(actual$avg, expected$avg)
+
irisDF <- suppressWarnings(createDataFrame (iris))
schema <- structType(structField("Sepal_Length", "double"), structField("Avg", "double"))
# Groups by `Sepal_Length` and computes the average for `Sepal_Width`
df4 <- gapply(
- cols = list("Sepal_Length"),
+ cols = "Sepal_Length",
irisDF,
function(key, x) {
y <- data.frame(key, mean(x$Sepal_Width), stringsAsFactors = FALSE)