diff options
author | Felix Cheung <felixcheung_m@hotmail.com> | 2017-02-15 10:45:37 -0800 |
---|---|---|
committer | Felix Cheung <felixcheung@apache.org> | 2017-02-15 10:45:37 -0800 |
commit | 671bc08ed502815bfa2254c30d64149402acb0c7 (patch) | |
tree | 3edcf2548e8f58a6a27db9c16050a3ff1d8ae261 /R/pkg/inst/tests/testthat/test_sparkSQL.R | |
parent | c97f4e17de0ce39e8172a5a4ae81f1914816a358 (diff) | |
download | spark-671bc08ed502815bfa2254c30d64149402acb0c7.tar.gz spark-671bc08ed502815bfa2254c30d64149402acb0c7.tar.bz2 spark-671bc08ed502815bfa2254c30d64149402acb0c7.zip |
[SPARK-19399][SPARKR] Add R coalesce API for DataFrame and Column
## What changes were proposed in this pull request?
Add coalesce on DataFrame for down partitioning without shuffle and coalesce on Column
## How was this patch tested?
manual, unit tests
Author: Felix Cheung <felixcheung_m@hotmail.com>
Closes #16739 from felixcheung/rcoalesce.
Diffstat (limited to 'R/pkg/inst/tests/testthat/test_sparkSQL.R')
-rw-r--r-- | R/pkg/inst/tests/testthat/test_sparkSQL.R | 32 |
1 files changed, 27 insertions, 5 deletions
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 1494ebb3de..199eb2057f 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -725,7 +725,7 @@ test_that("objectFile() works with row serialization", { objectPath <- tempfile(pattern = "spark-test", fileext = ".tmp") df <- read.json(jsonPath) dfRDD <- toRDD(df) - saveAsObjectFile(coalesce(dfRDD, 1L), objectPath) + saveAsObjectFile(coalesceRDD(dfRDD, 1L), objectPath) objectIn <- objectFile(sc, objectPath) expect_is(objectIn, "RDD") @@ -1236,7 +1236,7 @@ test_that("column functions", { c16 <- is.nan(c) + isnan(c) + isNaN(c) c17 <- cov(c, c1) + cov("c", "c1") + covar_samp(c, c1) + covar_samp("c", "c1") c18 <- covar_pop(c, c1) + covar_pop("c", "c1") - c19 <- spark_partition_id() + c19 <- spark_partition_id() + coalesce(c) + coalesce(c1, c2, c3) c20 <- to_timestamp(c) + to_timestamp(c, "yyyy") + to_date(c, "yyyy") # Test if base::is.nan() is exposed @@ -2491,15 +2491,18 @@ test_that("repartition by columns on DataFrame", { ("Please, specify the number of partitions and/or a column\\(s\\)", retError), TRUE) # repartition by column and number of partitions - actual <- repartition(df, 3L, col = df$"a") + actual <- repartition(df, 3, col = df$"a") - # since we cannot access the number of partitions from dataframe, checking - # that at least the dimensions are identical + # Checking that at least the dimensions are identical expect_identical(dim(df), dim(actual)) + expect_equal(getNumPartitions(actual), 3L) # repartition by number of partitions actual <- repartition(df, 13L) expect_identical(dim(df), dim(actual)) + expect_equal(getNumPartitions(actual), 13L) + + expect_equal(getNumPartitions(coalesce(actual, 1L)), 1L) # a test case with a column and dapply schema <- structType(structField("a", "integer"), structField("avg", "double")) @@ -2515,6 +2518,25 @@ test_that("repartition by columns on DataFrame", { expect_equal(nrow(df1), 2) }) +test_that("coalesce, repartition, numPartitions", { + df <- as.DataFrame(cars, numPartitions = 5) + expect_equal(getNumPartitions(df), 5) + expect_equal(getNumPartitions(coalesce(df, 3)), 3) + expect_equal(getNumPartitions(coalesce(df, 6)), 5) + + df1 <- coalesce(df, 3) + expect_equal(getNumPartitions(df1), 3) + expect_equal(getNumPartitions(coalesce(df1, 6)), 5) + expect_equal(getNumPartitions(coalesce(df1, 4)), 4) + expect_equal(getNumPartitions(coalesce(df1, 2)), 2) + + df2 <- repartition(df1, 10) + expect_equal(getNumPartitions(df2), 10) + expect_equal(getNumPartitions(coalesce(df2, 13)), 5) + expect_equal(getNumPartitions(coalesce(df2, 7)), 5) + expect_equal(getNumPartitions(coalesce(df2, 3)), 3) +}) + test_that("gapply() and gapplyCollect() on a DataFrame", { df <- createDataFrame ( list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)), |