aboutsummaryrefslogtreecommitdiff
path: root/R/pkg/inst
diff options
context:
space:
mode:
authorFelix Cheung <felixcheung_m@hotmail.com>2017-02-15 10:45:37 -0800
committerFelix Cheung <felixcheung@apache.org>2017-02-15 10:45:37 -0800
commit671bc08ed502815bfa2254c30d64149402acb0c7 (patch)
tree3edcf2548e8f58a6a27db9c16050a3ff1d8ae261 /R/pkg/inst
parentc97f4e17de0ce39e8172a5a4ae81f1914816a358 (diff)
downloadspark-671bc08ed502815bfa2254c30d64149402acb0c7.tar.gz
spark-671bc08ed502815bfa2254c30d64149402acb0c7.tar.bz2
spark-671bc08ed502815bfa2254c30d64149402acb0c7.zip
[SPARK-19399][SPARKR] Add R coalesce API for DataFrame and Column
## What changes were proposed in this pull request? Add coalesce on DataFrame for down partitioning without shuffle and coalesce on Column ## How was this patch tested? manual, unit tests Author: Felix Cheung <felixcheung_m@hotmail.com> Closes #16739 from felixcheung/rcoalesce.
Diffstat (limited to 'R/pkg/inst')
-rw-r--r--R/pkg/inst/tests/testthat/test_rdd.R2
-rw-r--r--R/pkg/inst/tests/testthat/test_sparkSQL.R32
2 files changed, 28 insertions, 6 deletions
diff --git a/R/pkg/inst/tests/testthat/test_rdd.R b/R/pkg/inst/tests/testthat/test_rdd.R
index ceb31bd896..787ef51c50 100644
--- a/R/pkg/inst/tests/testthat/test_rdd.R
+++ b/R/pkg/inst/tests/testthat/test_rdd.R
@@ -315,7 +315,7 @@ test_that("repartition/coalesce on RDDs", {
expect_true(count >= 0 && count <= 4)
# coalesce
- r3 <- coalesce(rdd, 1)
+ r3 <- coalesceRDD(rdd, 1)
expect_equal(getNumPartitionsRDD(r3), 1L)
count <- length(collectPartition(r3, 0L))
expect_equal(count, 20)
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 1494ebb3de..199eb2057f 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -725,7 +725,7 @@ test_that("objectFile() works with row serialization", {
objectPath <- tempfile(pattern = "spark-test", fileext = ".tmp")
df <- read.json(jsonPath)
dfRDD <- toRDD(df)
- saveAsObjectFile(coalesce(dfRDD, 1L), objectPath)
+ saveAsObjectFile(coalesceRDD(dfRDD, 1L), objectPath)
objectIn <- objectFile(sc, objectPath)
expect_is(objectIn, "RDD")
@@ -1236,7 +1236,7 @@ test_that("column functions", {
c16 <- is.nan(c) + isnan(c) + isNaN(c)
c17 <- cov(c, c1) + cov("c", "c1") + covar_samp(c, c1) + covar_samp("c", "c1")
c18 <- covar_pop(c, c1) + covar_pop("c", "c1")
- c19 <- spark_partition_id()
+ c19 <- spark_partition_id() + coalesce(c) + coalesce(c1, c2, c3)
c20 <- to_timestamp(c) + to_timestamp(c, "yyyy") + to_date(c, "yyyy")
# Test if base::is.nan() is exposed
@@ -2491,15 +2491,18 @@ test_that("repartition by columns on DataFrame", {
("Please, specify the number of partitions and/or a column\\(s\\)", retError), TRUE)
# repartition by column and number of partitions
- actual <- repartition(df, 3L, col = df$"a")
+ actual <- repartition(df, 3, col = df$"a")
- # since we cannot access the number of partitions from dataframe, checking
- # that at least the dimensions are identical
+ # Checking that at least the dimensions are identical
expect_identical(dim(df), dim(actual))
+ expect_equal(getNumPartitions(actual), 3L)
# repartition by number of partitions
actual <- repartition(df, 13L)
expect_identical(dim(df), dim(actual))
+ expect_equal(getNumPartitions(actual), 13L)
+
+ expect_equal(getNumPartitions(coalesce(actual, 1L)), 1L)
# a test case with a column and dapply
schema <- structType(structField("a", "integer"), structField("avg", "double"))
@@ -2515,6 +2518,25 @@ test_that("repartition by columns on DataFrame", {
expect_equal(nrow(df1), 2)
})
+test_that("coalesce, repartition, numPartitions", {
+ df <- as.DataFrame(cars, numPartitions = 5)
+ expect_equal(getNumPartitions(df), 5)
+ expect_equal(getNumPartitions(coalesce(df, 3)), 3)
+ expect_equal(getNumPartitions(coalesce(df, 6)), 5)
+
+ df1 <- coalesce(df, 3)
+ expect_equal(getNumPartitions(df1), 3)
+ expect_equal(getNumPartitions(coalesce(df1, 6)), 5)
+ expect_equal(getNumPartitions(coalesce(df1, 4)), 4)
+ expect_equal(getNumPartitions(coalesce(df1, 2)), 2)
+
+ df2 <- repartition(df1, 10)
+ expect_equal(getNumPartitions(df2), 10)
+ expect_equal(getNumPartitions(coalesce(df2, 13)), 5)
+ expect_equal(getNumPartitions(coalesce(df2, 7)), 5)
+ expect_equal(getNumPartitions(coalesce(df2, 3)), 3)
+})
+
test_that("gapply() and gapplyCollect() on a DataFrame", {
df <- createDataFrame (
list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),