diff options
author | Sun Rui <rui.sun@intel.com> | 2016-04-29 16:41:07 -0700 |
---|---|---|
committer | Shivaram Venkataraman <shivaram@cs.berkeley.edu> | 2016-04-29 16:41:07 -0700 |
commit | 4ae9fe091c2cb8388c581093d62d3deaef40993e (patch) | |
tree | fd84ce605c0ea8bd9d0b2e307119bd5d8651c9f5 /R/pkg/inst/tests/testthat/test_sparkSQL.R | |
parent | d78fbcc3cc9c379b4a548ebc816c6f71cc71a16e (diff) | |
download | spark-4ae9fe091c2cb8388c581093d62d3deaef40993e.tar.gz spark-4ae9fe091c2cb8388c581093d62d3deaef40993e.tar.bz2 spark-4ae9fe091c2cb8388c581093d62d3deaef40993e.zip |
[SPARK-12919][SPARKR] Implement dapply() on DataFrame in SparkR.
## What changes were proposed in this pull request?
dapply() applies an R function on each partition of a DataFrame and returns a new DataFrame.
The function signature is:
dapply(df, function(localDF) {}, schema = NULL)
R function input: local data.frame from the partition on local node
R function output: local data.frame
Schema specifies the Row format of the resulting DataFrame. It must match the R function's output.
If schema is not specified, each partition of the result DataFrame will be serialized in R into a single byte array. Such resulting DataFrame can be processed by successive calls to dapply().
## How was this patch tested?
SparkR unit tests.
Author: Sun Rui <rui.sun@intel.com>
Author: Sun Rui <sunrui2016@gmail.com>
Closes #12493 from sun-rui/SPARK-12919.
Diffstat (limited to 'R/pkg/inst/tests/testthat/test_sparkSQL.R')
-rw-r--r-- | R/pkg/inst/tests/testthat/test_sparkSQL.R | 40 |
1 files changed, 40 insertions, 0 deletions
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 7058265ea3..5cf9dc405b 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2043,6 +2043,46 @@ test_that("Histogram", { df <- as.DataFrame(sqlContext, data.frame(x = c(1, 2, 3, 4, 100))) expect_equal(histogram(df, "x")$counts, c(4, 0, 0, 0, 0, 0, 0, 0, 0, 1)) }) + +test_that("dapply() on a DataFrame", { + df <- createDataFrame ( + sqlContext, + list(list(1L, 1, "1"), list(2L, 2, "2"), list(3L, 3, "3")), + c("a", "b", "c")) + ldf <- collect(df) + df1 <- dapply(df, function(x) { x }, schema(df)) + result <- collect(df1) + expect_identical(ldf, result) + + + # Filter and add a column + schema <- structType(structField("a", "integer"), structField("b", "double"), + structField("c", "string"), structField("d", "integer")) + df1 <- dapply( + df, + function(x) { + y <- x[x$a > 1, ] + y <- cbind(y, y$a + 1L) + }, + schema) + result <- collect(df1) + expected <- ldf[ldf$a > 1, ] + expected$d <- expected$a + 1L + rownames(expected) <- NULL + expect_identical(expected, result) + + # Remove the added column + df2 <- dapply( + df1, + function(x) { + x[, c("a", "b", "c")] + }, + schema(df)) + result <- collect(df2) + expected <- expected[, c("a", "b", "c")] + expect_identical(expected, result) +}) + unlink(parquetPath) unlink(jsonPath) unlink(jsonPathNa) |