From 671bc08ed502815bfa2254c30d64149402acb0c7 Mon Sep 17 00:00:00 2001 From: Felix Cheung Date: Wed, 15 Feb 2017 10:45:37 -0800 Subject: [SPARK-19399][SPARKR] Add R coalesce API for DataFrame and Column ## What changes were proposed in this pull request? Add coalesce on DataFrame for down partitioning without shuffle and coalesce on Column ## How was this patch tested? manual, unit tests Author: Felix Cheung Closes #16739 from felixcheung/rcoalesce. --- R/pkg/NAMESPACE | 1 + R/pkg/R/DataFrame.R | 46 +++++++++++++++++++++++++++++-- R/pkg/R/RDD.R | 4 +-- R/pkg/R/functions.R | 26 +++++++++++++++-- R/pkg/R/generics.R | 9 +++++- R/pkg/inst/tests/testthat/test_rdd.R | 2 +- R/pkg/inst/tests/testthat/test_sparkSQL.R | 32 +++++++++++++++++---- 7 files changed, 106 insertions(+), 14 deletions(-) (limited to 'R') diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 8b265006cb..81e19364ae 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -82,6 +82,7 @@ exportMethods("arrange", "as.data.frame", "attach", "cache", + "coalesce", "collect", "colnames", "colnames<-", diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 5bca4105fc..cf331bab47 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -678,14 +678,53 @@ setMethod("storageLevel", storageLevelToString(callJMethod(x@sdf, "storageLevel")) }) +#' Coalesce +#' +#' Returns a new SparkDataFrame that has exactly \code{numPartitions} partitions. +#' This operation results in a narrow dependency, e.g. if you go from 1000 partitions to 100 +#' partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of +#' the current partitions. If a larger number of partitions is requested, it will stay at the +#' current number of partitions. +#' +#' However, if you're doing a drastic coalesce on a SparkDataFrame, e.g. to numPartitions = 1, +#' this may result in your computation taking place on fewer nodes than +#' you like (e.g. one node in the case of numPartitions = 1). To avoid this, +#' call \code{repartition}. This will add a shuffle step, but means the +#' current upstream partitions will be executed in parallel (per whatever +#' the current partitioning is). +#' +#' @param numPartitions the number of partitions to use. +#' +#' @family SparkDataFrame functions +#' @rdname coalesce +#' @name coalesce +#' @aliases coalesce,SparkDataFrame-method +#' @seealso \link{repartition} +#' @export +#' @examples +#'\dontrun{ +#' sparkR.session() +#' path <- "path/to/file.json" +#' df <- read.json(path) +#' newDF <- coalesce(df, 1L) +#'} +#' @note coalesce(SparkDataFrame) since 2.1.1 +setMethod("coalesce", + signature(x = "SparkDataFrame"), + function(x, numPartitions) { + stopifnot(is.numeric(numPartitions)) + sdf <- callJMethod(x@sdf, "coalesce", numToInt(numPartitions)) + dataFrame(sdf) + }) + #' Repartition #' #' The following options for repartition are possible: #' \itemize{ -#' \item{1.} {Return a new SparkDataFrame partitioned by +#' \item{1.} {Return a new SparkDataFrame that has exactly \code{numPartitions}.} +#' \item{2.} {Return a new SparkDataFrame hash partitioned by #' the given columns into \code{numPartitions}.} -#' \item{2.} {Return a new SparkDataFrame that has exactly \code{numPartitions}.} -#' \item{3.} {Return a new SparkDataFrame partitioned by the given column(s), +#' \item{3.} {Return a new SparkDataFrame hash partitioned by the given column(s), #' using \code{spark.sql.shuffle.partitions} as number of partitions.} #'} #' @param x a SparkDataFrame. @@ -697,6 +736,7 @@ setMethod("storageLevel", #' @rdname repartition #' @name repartition #' @aliases repartition,SparkDataFrame-method +#' @seealso \link{coalesce} #' @export #' @examples #'\dontrun{ diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R index 91bab332c2..5667b9d788 100644 --- a/R/pkg/R/RDD.R +++ b/R/pkg/R/RDD.R @@ -1028,7 +1028,7 @@ setMethod("repartitionRDD", signature(x = "RDD"), function(x, numPartitions) { if (!is.null(numPartitions) && is.numeric(numPartitions)) { - coalesce(x, numPartitions, TRUE) + coalesceRDD(x, numPartitions, TRUE) } else { stop("Please, specify the number of partitions") } @@ -1049,7 +1049,7 @@ setMethod("repartitionRDD", #' @rdname coalesce #' @aliases coalesce,RDD #' @noRd -setMethod("coalesce", +setMethod("coalesceRDD", signature(x = "RDD", numPartitions = "numeric"), function(x, numPartitions, shuffle = FALSE) { numPartitions <- numToInt(numPartitions) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 032cfecfc0..9e5084481f 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -286,6 +286,28 @@ setMethod("ceil", column(jc) }) +#' Returns the first column that is not NA +#' +#' Returns the first column that is not NA, or NA if all inputs are. +#' +#' @rdname coalesce +#' @name coalesce +#' @family normal_funcs +#' @export +#' @aliases coalesce,Column-method +#' @examples \dontrun{coalesce(df$c, df$d, df$e)} +#' @note coalesce(Column) since 2.1.1 +setMethod("coalesce", + signature(x = "Column"), + function(x, ...) { + jcols <- lapply(list(x, ...), function (x) { + stopifnot(class(x) == "Column") + x@jc + }) + jc <- callJStatic("org.apache.spark.sql.functions", "coalesce", jcols) + column(jc) + }) + #' Though scala functions has "col" function, we don't expose it in SparkR #' because we don't want to conflict with the "col" function in the R base #' package and we also have "column" function exported which is an alias of "col". @@ -297,7 +319,7 @@ col <- function(x) { #' Returns a Column based on the given column name #' #' Returns a Column based on the given column name. -# +#' #' @param x Character column name. #' #' @rdname column @@ -305,7 +327,7 @@ col <- function(x) { #' @family normal_funcs #' @export #' @aliases column,character-method -#' @examples \dontrun{column(df)} +#' @examples \dontrun{column("name")} #' @note column since 1.6.0 setMethod("column", signature(x = "character"), diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 0d9a9968e2..68864e6fe1 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -28,7 +28,7 @@ setGeneric("cacheRDD", function(x) { standardGeneric("cacheRDD") }) # @rdname coalesce # @seealso repartition # @export -setGeneric("coalesce", function(x, numPartitions, ...) { standardGeneric("coalesce") }) +setGeneric("coalesceRDD", function(x, numPartitions, ...) { standardGeneric("coalesceRDD") }) # @rdname checkpoint-methods # @export @@ -406,6 +406,13 @@ setGeneric("attach") #' @export setGeneric("cache", function(x) { standardGeneric("cache") }) +#' @rdname coalesce +#' @param x a Column or a SparkDataFrame. +#' @param ... additional argument(s). If \code{x} is a Column, additional Columns can be optionally +#' provided. +#' @export +setGeneric("coalesce", function(x, ...) { standardGeneric("coalesce") }) + #' @rdname collect #' @export setGeneric("collect", function(x, ...) { standardGeneric("collect") }) diff --git a/R/pkg/inst/tests/testthat/test_rdd.R b/R/pkg/inst/tests/testthat/test_rdd.R index ceb31bd896..787ef51c50 100644 --- a/R/pkg/inst/tests/testthat/test_rdd.R +++ b/R/pkg/inst/tests/testthat/test_rdd.R @@ -315,7 +315,7 @@ test_that("repartition/coalesce on RDDs", { expect_true(count >= 0 && count <= 4) # coalesce - r3 <- coalesce(rdd, 1) + r3 <- coalesceRDD(rdd, 1) expect_equal(getNumPartitionsRDD(r3), 1L) count <- length(collectPartition(r3, 0L)) expect_equal(count, 20) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 1494ebb3de..199eb2057f 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -725,7 +725,7 @@ test_that("objectFile() works with row serialization", { objectPath <- tempfile(pattern = "spark-test", fileext = ".tmp") df <- read.json(jsonPath) dfRDD <- toRDD(df) - saveAsObjectFile(coalesce(dfRDD, 1L), objectPath) + saveAsObjectFile(coalesceRDD(dfRDD, 1L), objectPath) objectIn <- objectFile(sc, objectPath) expect_is(objectIn, "RDD") @@ -1236,7 +1236,7 @@ test_that("column functions", { c16 <- is.nan(c) + isnan(c) + isNaN(c) c17 <- cov(c, c1) + cov("c", "c1") + covar_samp(c, c1) + covar_samp("c", "c1") c18 <- covar_pop(c, c1) + covar_pop("c", "c1") - c19 <- spark_partition_id() + c19 <- spark_partition_id() + coalesce(c) + coalesce(c1, c2, c3) c20 <- to_timestamp(c) + to_timestamp(c, "yyyy") + to_date(c, "yyyy") # Test if base::is.nan() is exposed @@ -2491,15 +2491,18 @@ test_that("repartition by columns on DataFrame", { ("Please, specify the number of partitions and/or a column\\(s\\)", retError), TRUE) # repartition by column and number of partitions - actual <- repartition(df, 3L, col = df$"a") + actual <- repartition(df, 3, col = df$"a") - # since we cannot access the number of partitions from dataframe, checking - # that at least the dimensions are identical + # Checking that at least the dimensions are identical expect_identical(dim(df), dim(actual)) + expect_equal(getNumPartitions(actual), 3L) # repartition by number of partitions actual <- repartition(df, 13L) expect_identical(dim(df), dim(actual)) + expect_equal(getNumPartitions(actual), 13L) + + expect_equal(getNumPartitions(coalesce(actual, 1L)), 1L) # a test case with a column and dapply schema <- structType(structField("a", "integer"), structField("avg", "double")) @@ -2515,6 +2518,25 @@ test_that("repartition by columns on DataFrame", { expect_equal(nrow(df1), 2) }) +test_that("coalesce, repartition, numPartitions", { + df <- as.DataFrame(cars, numPartitions = 5) + expect_equal(getNumPartitions(df), 5) + expect_equal(getNumPartitions(coalesce(df, 3)), 3) + expect_equal(getNumPartitions(coalesce(df, 6)), 5) + + df1 <- coalesce(df, 3) + expect_equal(getNumPartitions(df1), 3) + expect_equal(getNumPartitions(coalesce(df1, 6)), 5) + expect_equal(getNumPartitions(coalesce(df1, 4)), 4) + expect_equal(getNumPartitions(coalesce(df1, 2)), 2) + + df2 <- repartition(df1, 10) + expect_equal(getNumPartitions(df2), 10) + expect_equal(getNumPartitions(coalesce(df2, 13)), 5) + expect_equal(getNumPartitions(coalesce(df2, 7)), 5) + expect_equal(getNumPartitions(coalesce(df2, 3)), 3) +}) + test_that("gapply() and gapplyCollect() on a DataFrame", { df <- createDataFrame ( list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)), -- cgit v1.2.3