aboutsummaryrefslogtreecommitdiff
path: root/R/pkg/inst
diff options
context:
space:
mode:
authorSun Rui <sunrui2016@gmail.com>2016-05-12 17:50:55 -0700
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2016-05-12 17:50:55 -0700
commitb3930f74a0929b2cdcbbe5cbe34f0b1d35eb01cc (patch)
tree6c08e7b8ca13d7e73f5c667b94bb6039211fd2bc /R/pkg/inst
parentbb1362eb3b36b553dca246b95f59ba7fd8adcc8a (diff)
downloadspark-b3930f74a0929b2cdcbbe5cbe34f0b1d35eb01cc.tar.gz
spark-b3930f74a0929b2cdcbbe5cbe34f0b1d35eb01cc.tar.bz2
spark-b3930f74a0929b2cdcbbe5cbe34f0b1d35eb01cc.zip
[SPARK-15202][SPARKR] add dapplyCollect() method for DataFrame in SparkR.
## What changes were proposed in this pull request? dapplyCollect() applies an R function on each partition of a SparkDataFrame and collects the result back to R as a data.frame. ``` dapplyCollect(df, function(ldf) {...}) ``` ## How was this patch tested? SparkR unit tests. Author: Sun Rui <sunrui2016@gmail.com> Closes #12989 from sun-rui/SPARK-15202.
Diffstat (limited to 'R/pkg/inst')
-rw-r--r--R/pkg/inst/tests/testthat/test_sparkSQL.R21
1 files changed, 20 insertions, 1 deletions
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 0f67bc2e33..6a99b43e5a 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -2043,7 +2043,7 @@ test_that("Histogram", {
expect_equal(histogram(df, "x")$counts, c(4, 0, 0, 0, 0, 0, 0, 0, 0, 1))
})
-test_that("dapply() on a DataFrame", {
+test_that("dapply() and dapplyCollect() on a DataFrame", {
df <- createDataFrame (
sqlContext,
list(list(1L, 1, "1"), list(2L, 2, "2"), list(3L, 3, "3")),
@@ -2053,6 +2053,8 @@ test_that("dapply() on a DataFrame", {
result <- collect(df1)
expect_identical(ldf, result)
+ result <- dapplyCollect(df, function(x) { x })
+ expect_identical(ldf, result)
# Filter and add a column
schema <- structType(structField("a", "integer"), structField("b", "double"),
@@ -2070,6 +2072,16 @@ test_that("dapply() on a DataFrame", {
rownames(expected) <- NULL
expect_identical(expected, result)
+ result <- dapplyCollect(
+ df,
+ function(x) {
+ y <- x[x$a > 1, ]
+ y <- cbind(y, y$a + 1L)
+ })
+ expected1 <- expected
+ names(expected1) <- names(result)
+ expect_identical(expected1, result)
+
# Remove the added column
df2 <- dapply(
df1,
@@ -2080,6 +2092,13 @@ test_that("dapply() on a DataFrame", {
result <- collect(df2)
expected <- expected[, c("a", "b", "c")]
expect_identical(expected, result)
+
+ result <- dapplyCollect(
+ df1,
+ function(x) {
+ x[, c("a", "b", "c")]
+ })
+ expect_identical(expected, result)
})
test_that("repartition by columns on DataFrame", {