aboutsummaryrefslogtreecommitdiff
path: root/R/pkg/inst/tests/testthat/test_sparkSQL.R
diff options
context:
space:
mode:
authorSun Rui <rui.sun@intel.com>2016-04-29 16:41:07 -0700
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2016-04-29 16:41:07 -0700
commit4ae9fe091c2cb8388c581093d62d3deaef40993e (patch)
treefd84ce605c0ea8bd9d0b2e307119bd5d8651c9f5 /R/pkg/inst/tests/testthat/test_sparkSQL.R
parentd78fbcc3cc9c379b4a548ebc816c6f71cc71a16e (diff)
downloadspark-4ae9fe091c2cb8388c581093d62d3deaef40993e.tar.gz
spark-4ae9fe091c2cb8388c581093d62d3deaef40993e.tar.bz2
spark-4ae9fe091c2cb8388c581093d62d3deaef40993e.zip
[SPARK-12919][SPARKR] Implement dapply() on DataFrame in SparkR.
## What changes were proposed in this pull request? dapply() applies an R function on each partition of a DataFrame and returns a new DataFrame. The function signature is: dapply(df, function(localDF) {}, schema = NULL) R function input: local data.frame from the partition on local node R function output: local data.frame Schema specifies the Row format of the resulting DataFrame. It must match the R function's output. If schema is not specified, each partition of the result DataFrame will be serialized in R into a single byte array. Such resulting DataFrame can be processed by successive calls to dapply(). ## How was this patch tested? SparkR unit tests. Author: Sun Rui <rui.sun@intel.com> Author: Sun Rui <sunrui2016@gmail.com> Closes #12493 from sun-rui/SPARK-12919.
Diffstat (limited to 'R/pkg/inst/tests/testthat/test_sparkSQL.R')
-rw-r--r--R/pkg/inst/tests/testthat/test_sparkSQL.R40
1 files changed, 40 insertions, 0 deletions
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 7058265ea3..5cf9dc405b 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -2043,6 +2043,46 @@ test_that("Histogram", {
df <- as.DataFrame(sqlContext, data.frame(x = c(1, 2, 3, 4, 100)))
expect_equal(histogram(df, "x")$counts, c(4, 0, 0, 0, 0, 0, 0, 0, 0, 1))
})
+
+test_that("dapply() on a DataFrame", {
+ df <- createDataFrame (
+ sqlContext,
+ list(list(1L, 1, "1"), list(2L, 2, "2"), list(3L, 3, "3")),
+ c("a", "b", "c"))
+ ldf <- collect(df)
+ df1 <- dapply(df, function(x) { x }, schema(df))
+ result <- collect(df1)
+ expect_identical(ldf, result)
+
+
+ # Filter and add a column
+ schema <- structType(structField("a", "integer"), structField("b", "double"),
+ structField("c", "string"), structField("d", "integer"))
+ df1 <- dapply(
+ df,
+ function(x) {
+ y <- x[x$a > 1, ]
+ y <- cbind(y, y$a + 1L)
+ },
+ schema)
+ result <- collect(df1)
+ expected <- ldf[ldf$a > 1, ]
+ expected$d <- expected$a + 1L
+ rownames(expected) <- NULL
+ expect_identical(expected, result)
+
+ # Remove the added column
+ df2 <- dapply(
+ df1,
+ function(x) {
+ x[, c("a", "b", "c")]
+ },
+ schema(df))
+ result <- collect(df2)
+ expected <- expected[, c("a", "b", "c")]
+ expect_identical(expected, result)
+})
+
unlink(parquetPath)
unlink(jsonPath)
unlink(jsonPathNa)