From 26afb4ce4099e7942f8db1ead3817ed8fbf71ce3 Mon Sep 17 00:00:00 2001 From: Narine Kokhlikyan Date: Fri, 1 Jul 2016 13:55:13 -0700 Subject: [SPARK-16012][SPARKR] Implement gapplyCollect which will apply a R function on each group similar to gapply and collect the result back to R data.frame ## What changes were proposed in this pull request? gapplyCollect() does gapply() on a SparkDataFrame and collect the result back to R. Compared to gapply() + collect(), gapplyCollect() offers performance optimization as well as programming convenience, as no schema is needed to be provided. This is similar to dapplyCollect(). ## How was this patch tested? Added test cases for gapplyCollect similar to dapplyCollect Author: Narine Kokhlikyan Closes #13760 from NarineK/gapplyCollect. --- R/pkg/NAMESPACE | 1 + R/pkg/R/DataFrame.R | 111 +++++++++++++++++++++++++++--- R/pkg/R/generics.R | 4 ++ R/pkg/R/group.R | 93 +++++++++++-------------- R/pkg/inst/tests/testthat/test_sparkSQL.R | 35 ++++++++-- 5 files changed, 177 insertions(+), 67 deletions(-) (limited to 'R') diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index abc65887bc..bc3aceba22 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -69,6 +69,7 @@ exportMethods("arrange", "first", "freqItems", "gapply", + "gapplyCollect", "group_by", "groupBy", "head", diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 25327be6ef..5944bbc765 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1344,7 +1344,7 @@ setMethod("dapplyCollect", #' gapply #' -#' Group the SparkDataFrame using the specified columns and apply the R function to each +#' Groups the SparkDataFrame using the specified columns and applies the R function to each #' group. #' #' @param x A SparkDataFrame @@ -1356,9 +1356,11 @@ setMethod("dapplyCollect", #' @param schema The schema of the resulting SparkDataFrame after the function is applied. #' The schema must match to output of `func`. It has to be defined for each #' output column with preferred output column name and corresponding data type. +#' @return a SparkDataFrame #' @family SparkDataFrame functions #' @rdname gapply #' @name gapply +#' @seealso \link{gapplyCollect} #' @export #' @examples #' @@ -1374,14 +1376,22 @@ setMethod("dapplyCollect", #' columns with data types integer and string and the mean which is a double. #' schema <- structType(structField("a", "integer"), structField("c", "string"), #' structField("avg", "double")) -#' df1 <- gapply( +#' result <- gapply( #' df, -#' list("a", "c"), +#' c("a", "c"), #' function(key, x) { #' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) -#' }, -#' schema) -#' collect(df1) +#' }, schema) +#' +#' We can also group the data and afterwards call gapply on GroupedData. +#' For Example: +#' gdf <- group_by(df, "a", "c") +#' result <- gapply( +#' gdf, +#' function(key, x) { +#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) +#' }, schema) +#' collect(result) #' #' Result #' ------ @@ -1399,7 +1409,7 @@ setMethod("dapplyCollect", #' structField("Petal_Width", "double")) #' df1 <- gapply( #' df, -#' list(df$"Species"), +#' df$"Species", #' function(key, x) { #' m <- suppressWarnings(lm(Sepal_Length ~ #' Sepal_Width + Petal_Length + Petal_Width, x)) @@ -1407,8 +1417,8 @@ setMethod("dapplyCollect", #' }, schema) #' collect(df1) #' -#'Result -#'--------- +#' Result +#' --------- #' Model (Intercept) Sepal_Width Petal_Length Petal_Width #' 1 0.699883 0.3303370 0.9455356 -0.1697527 #' 2 1.895540 0.3868576 0.9083370 -0.6792238 @@ -1423,6 +1433,89 @@ setMethod("gapply", gapply(grouped, func, schema) }) +#' gapplyCollect +#' +#' Groups the SparkDataFrame using the specified columns, applies the R function to each +#' group and collects the result back to R as data.frame. +#' +#' @param x A SparkDataFrame +#' @param cols Grouping columns +#' @param func A function to be applied to each group partition specified by grouping +#' column of the SparkDataFrame. The function `func` takes as argument +#' a key - grouping columns and a data frame - a local R data.frame. +#' The output of `func` is a local R data.frame. +#' @return a data.frame +#' @family SparkDataFrame functions +#' @rdname gapplyCollect +#' @name gapplyCollect +#' @seealso \link{gapply} +#' @export +#' @examples +#' +#' \dontrun{ +#' Computes the arithmetic mean of the second column by grouping +#' on the first and third columns. Output the grouping values and the average. +#' +#' df <- createDataFrame ( +#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)), +#' c("a", "b", "c", "d")) +#' +#' result <- gapplyCollect( +#' df, +#' c("a", "c"), +#' function(key, x) { +#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) +#' colnames(y) <- c("key_a", "key_c", "mean_b") +#' y +#' }) +#' +#' We can also group the data and afterwards call gapply on GroupedData. +#' For Example: +#' gdf <- group_by(df, "a", "c") +#' result <- gapplyCollect( +#' gdf, +#' function(key, x) { +#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) +#' colnames(y) <- c("key_a", "key_c", "mean_b") +#' y +#' }) +#' +#' Result +#' ------ +#' key_a key_c mean_b +#' 3 3 3.0 +#' 1 1 1.5 +#' +#' Fits linear models on iris dataset by grouping on the 'Species' column and +#' using 'Sepal_Length' as a target variable, 'Sepal_Width', 'Petal_Length' +#' and 'Petal_Width' as training features. +#' +#' df <- createDataFrame (iris) +#' result <- gapplyCollect( +#' df, +#' df$"Species", +#' function(key, x) { +#' m <- suppressWarnings(lm(Sepal_Length ~ +#' Sepal_Width + Petal_Length + Petal_Width, x)) +#' data.frame(t(coef(m))) +#' }) +#' +#' Result +#'--------- +#' Model X.Intercept. Sepal_Width Petal_Length Petal_Width +#' 1 0.699883 0.3303370 0.9455356 -0.1697527 +#' 2 1.895540 0.3868576 0.9083370 -0.6792238 +#' 3 2.351890 0.6548350 0.2375602 0.2521257 +#' +#'} +#' @note gapplyCollect(SparkDataFrame) since 2.0.0 +setMethod("gapplyCollect", + signature(x = "SparkDataFrame"), + function(x, cols, func) { + grouped <- do.call("groupBy", c(x, cols)) + gapplyCollect(grouped, func) + }) + ############################## RDD Map Functions ################################## # All of the following functions mirror the existing RDD map functions, # # but allow for use with DataFrames by first converting to an RRDD before calling # diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index d9080b6b32..e4ec508795 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -469,6 +469,10 @@ setGeneric("dapplyCollect", function(x, func) { standardGeneric("dapplyCollect") #' @export setGeneric("gapply", function(x, ...) { standardGeneric("gapply") }) +#' @rdname gapplyCollect +#' @export +setGeneric("gapplyCollect", function(x, ...) { standardGeneric("gapplyCollect") }) + #' @rdname summary #' @export setGeneric("describe", function(x, col, ...) { standardGeneric("describe") }) diff --git a/R/pkg/R/group.R b/R/pkg/R/group.R index 0687f14adf..5ed7e8abb4 100644 --- a/R/pkg/R/group.R +++ b/R/pkg/R/group.R @@ -196,64 +196,51 @@ createMethods() #' gapply #' -#' Applies a R function to each group in the input GroupedData -#' -#' @param x a GroupedData -#' @param func A function to be applied to each group partition specified by GroupedData. -#' The function `func` takes as argument a key - grouping columns and -#' a data frame - a local R data.frame. -#' The output of `func` is a local R data.frame. -#' @param schema The schema of the resulting SparkDataFrame after the function is applied. -#' The schema must match to output of `func`. It has to be defined for each -#' output column with preferred output column name and corresponding data type. -#' @return a SparkDataFrame +#' @param x A GroupedData #' @rdname gapply #' @name gapply #' @export -#' @examples -#' \dontrun{ -#' Computes the arithmetic mean of the second column by grouping -#' on the first and third columns. Output the grouping values and the average. -#' -#' df <- createDataFrame ( -#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)), -#' c("a", "b", "c", "d")) -#' -#' Here our output contains three columns, the key which is a combination of two -#' columns with data types integer and string and the mean which is a double. -#' schema <- structType(structField("a", "integer"), structField("c", "string"), -#' structField("avg", "double")) -#' df1 <- gapply( -#' df, -#' list("a", "c"), -#' function(key, x) { -#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) -#' }, -#' schema) -#' collect(df1) -#' -#' Result -#' ------ -#' a c avg -#' 3 3 3.0 -#' 1 1 1.5 -#' } #' @note gapply(GroupedData) since 2.0.0 setMethod("gapply", signature(x = "GroupedData"), function(x, func, schema) { - try(if (is.null(schema)) stop("schema cannot be NULL")) - packageNamesArr <- serialize(.sparkREnv[[".packages"]], - connection = NULL) - broadcastArr <- lapply(ls(.broadcastNames), - function(name) { get(name, .broadcastNames) }) - sdf <- callJStatic( - "org.apache.spark.sql.api.r.SQLUtils", - "gapply", - x@sgd, - serialize(cleanClosure(func), connection = NULL), - packageNamesArr, - broadcastArr, - schema$jobj) - dataFrame(sdf) + if (is.null(schema)) stop("schema cannot be NULL") + gapplyInternal(x, func, schema) + }) + +#' gapplyCollect +#' +#' @param x A GroupedData +#' @rdname gapplyCollect +#' @name gapplyCollect +#' @export +#' @note gapplyCollect(GroupedData) since 2.0.0 +setMethod("gapplyCollect", + signature(x = "GroupedData"), + function(x, func) { + gdf <- gapplyInternal(x, func, NULL) + content <- callJMethod(gdf@sdf, "collect") + # content is a list of items of struct type. Each item has a single field + # which is a serialized data.frame corresponds to one group of the + # SparkDataFrame. + ldfs <- lapply(content, function(x) { unserialize(x[[1]]) }) + ldf <- do.call(rbind, ldfs) + row.names(ldf) <- NULL + ldf }) + +gapplyInternal <- function(x, func, schema) { + packageNamesArr <- serialize(.sparkREnv[[".packages"]], + connection = NULL) + broadcastArr <- lapply(ls(.broadcastNames), + function(name) { get(name, .broadcastNames) }) + sdf <- callJStatic( + "org.apache.spark.sql.api.r.SQLUtils", + "gapply", + x@sgd, + serialize(cleanClosure(func), connection = NULL), + packageNamesArr, + broadcastArr, + if (class(schema) == "structType") { schema$jobj } else { NULL }) + dataFrame(sdf) +} \ No newline at end of file diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 588c217f3c..3b8d5707f4 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2257,21 +2257,24 @@ test_that("repartition by columns on DataFrame", { expect_equal(nrow(df1), 2) }) -test_that("gapply() on a DataFrame", { +test_that("gapply() and gapplyCollect() on a DataFrame", { df <- createDataFrame ( list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)), c("a", "b", "c", "d")) expected <- collect(df) - df1 <- gapply(df, list("a"), function(key, x) { x }, schema(df)) + df1 <- gapply(df, "a", function(key, x) { x }, schema(df)) actual <- collect(df1) expect_identical(actual, expected) + df1Collect <- gapplyCollect(df, list("a"), function(key, x) { x }) + expect_identical(df1Collect, expected) + # Computes the sum of second column by grouping on the first and third columns # and checks if the sum is larger than 2 schema <- structType(structField("a", "integer"), structField("e", "boolean")) df2 <- gapply( df, - list(df$"a", df$"c"), + c(df$"a", df$"c"), function(key, x) { y <- data.frame(key[1], sum(x$b) > 2) }, @@ -2280,13 +2283,24 @@ test_that("gapply() on a DataFrame", { expected <- c(TRUE, TRUE) expect_identical(actual, expected) + df2Collect <- gapplyCollect( + df, + c(df$"a", df$"c"), + function(key, x) { + y <- data.frame(key[1], sum(x$b) > 2) + colnames(y) <- c("a", "e") + y + }) + actual <- df2Collect$e + expect_identical(actual, expected) + # Computes the arithmetic mean of the second column by grouping # on the first and third columns. Output the groupping value and the average. schema <- structType(structField("a", "integer"), structField("c", "string"), structField("avg", "double")) df3 <- gapply( df, - list("a", "c"), + c("a", "c"), function(key, x) { y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) }, @@ -2301,11 +2315,22 @@ test_that("gapply() on a DataFrame", { rownames(expected) <- NULL expect_identical(actual, expected) + df3Collect <- gapplyCollect( + df, + c("a", "c"), + function(key, x) { + y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) + colnames(y) <- c("a", "c", "avg") + y + }) + actual <- df3Collect[order(df3Collect$a), ] + expect_identical(actual$avg, expected$avg) + irisDF <- suppressWarnings(createDataFrame (iris)) schema <- structType(structField("Sepal_Length", "double"), structField("Avg", "double")) # Groups by `Sepal_Length` and computes the average for `Sepal_Width` df4 <- gapply( - cols = list("Sepal_Length"), + cols = "Sepal_Length", irisDF, function(key, x) { y <- data.frame(key, mean(x$Sepal_Width), stringsAsFactors = FALSE) -- cgit v1.2.3