From 26afb4ce4099e7942f8db1ead3817ed8fbf71ce3 Mon Sep 17 00:00:00 2001 From: Narine Kokhlikyan Date: Fri, 1 Jul 2016 13:55:13 -0700 Subject: [SPARK-16012][SPARKR] Implement gapplyCollect which will apply a R function on each group similar to gapply and collect the result back to R data.frame ## What changes were proposed in this pull request? gapplyCollect() does gapply() on a SparkDataFrame and collect the result back to R. Compared to gapply() + collect(), gapplyCollect() offers performance optimization as well as programming convenience, as no schema is needed to be provided. This is similar to dapplyCollect(). ## How was this patch tested? Added test cases for gapplyCollect similar to dapplyCollect Author: Narine Kokhlikyan Closes #13760 from NarineK/gapplyCollect. --- R/pkg/inst/tests/testthat/test_sparkSQL.R | 35 ++++++++++++++++++++++++++----- 1 file changed, 30 insertions(+), 5 deletions(-) (limited to 'R/pkg/inst') diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 588c217f3c..3b8d5707f4 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2257,21 +2257,24 @@ test_that("repartition by columns on DataFrame", { expect_equal(nrow(df1), 2) }) -test_that("gapply() on a DataFrame", { +test_that("gapply() and gapplyCollect() on a DataFrame", { df <- createDataFrame ( list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)), c("a", "b", "c", "d")) expected <- collect(df) - df1 <- gapply(df, list("a"), function(key, x) { x }, schema(df)) + df1 <- gapply(df, "a", function(key, x) { x }, schema(df)) actual <- collect(df1) expect_identical(actual, expected) + df1Collect <- gapplyCollect(df, list("a"), function(key, x) { x }) + expect_identical(df1Collect, expected) + # Computes the sum of second column by grouping on the first and third columns # and checks if the sum is larger than 2 schema <- structType(structField("a", "integer"), structField("e", "boolean")) df2 <- gapply( df, - list(df$"a", df$"c"), + c(df$"a", df$"c"), function(key, x) { y <- data.frame(key[1], sum(x$b) > 2) }, @@ -2280,13 +2283,24 @@ test_that("gapply() on a DataFrame", { expected <- c(TRUE, TRUE) expect_identical(actual, expected) + df2Collect <- gapplyCollect( + df, + c(df$"a", df$"c"), + function(key, x) { + y <- data.frame(key[1], sum(x$b) > 2) + colnames(y) <- c("a", "e") + y + }) + actual <- df2Collect$e + expect_identical(actual, expected) + # Computes the arithmetic mean of the second column by grouping # on the first and third columns. Output the groupping value and the average. schema <- structType(structField("a", "integer"), structField("c", "string"), structField("avg", "double")) df3 <- gapply( df, - list("a", "c"), + c("a", "c"), function(key, x) { y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) }, @@ -2301,11 +2315,22 @@ test_that("gapply() on a DataFrame", { rownames(expected) <- NULL expect_identical(actual, expected) + df3Collect <- gapplyCollect( + df, + c("a", "c"), + function(key, x) { + y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) + colnames(y) <- c("a", "c", "avg") + y + }) + actual <- df3Collect[order(df3Collect$a), ] + expect_identical(actual$avg, expected$avg) + irisDF <- suppressWarnings(createDataFrame (iris)) schema <- structType(structField("Sepal_Length", "double"), structField("Avg", "double")) # Groups by `Sepal_Length` and computes the average for `Sepal_Width` df4 <- gapply( - cols = list("Sepal_Length"), + cols = "Sepal_Length", irisDF, function(key, x) { y <- data.frame(key, mean(x$Sepal_Width), stringsAsFactors = FALSE) -- cgit v1.2.3