From 4ae9fe091c2cb8388c581093d62d3deaef40993e Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Fri, 29 Apr 2016 16:41:07 -0700 Subject: [SPARK-12919][SPARKR] Implement dapply() on DataFrame in SparkR. ## What changes were proposed in this pull request? dapply() applies an R function on each partition of a DataFrame and returns a new DataFrame. The function signature is: dapply(df, function(localDF) {}, schema = NULL) R function input: local data.frame from the partition on local node R function output: local data.frame Schema specifies the Row format of the resulting DataFrame. It must match the R function's output. If schema is not specified, each partition of the result DataFrame will be serialized in R into a single byte array. Such resulting DataFrame can be processed by successive calls to dapply(). ## How was this patch tested? SparkR unit tests. Author: Sun Rui Author: Sun Rui Closes #12493 from sun-rui/SPARK-12919. --- R/pkg/inst/tests/testthat/test_sparkSQL.R | 40 +++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) (limited to 'R/pkg/inst/tests') diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 7058265ea3..5cf9dc405b 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2043,6 +2043,46 @@ test_that("Histogram", { df <- as.DataFrame(sqlContext, data.frame(x = c(1, 2, 3, 4, 100))) expect_equal(histogram(df, "x")$counts, c(4, 0, 0, 0, 0, 0, 0, 0, 0, 1)) }) + +test_that("dapply() on a DataFrame", { + df <- createDataFrame ( + sqlContext, + list(list(1L, 1, "1"), list(2L, 2, "2"), list(3L, 3, "3")), + c("a", "b", "c")) + ldf <- collect(df) + df1 <- dapply(df, function(x) { x }, schema(df)) + result <- collect(df1) + expect_identical(ldf, result) + + + # Filter and add a column + schema <- structType(structField("a", "integer"), structField("b", "double"), + structField("c", "string"), structField("d", "integer")) + df1 <- dapply( + df, + function(x) { + y <- x[x$a > 1, ] + y <- cbind(y, y$a + 1L) + }, + schema) + result <- collect(df1) + expected <- ldf[ldf$a > 1, ] + expected$d <- expected$a + 1L + rownames(expected) <- NULL + expect_identical(expected, result) + + # Remove the added column + df2 <- dapply( + df1, + function(x) { + x[, c("a", "b", "c")] + }, + schema(df)) + result <- collect(df2) + expected <- expected[, c("a", "b", "c")] + expect_identical(expected, result) +}) + unlink(parquetPath) unlink(jsonPath) unlink(jsonPathNa) -- cgit v1.2.3