aboutsummaryrefslogtreecommitdiff
path: root/R
diff options
context:
space:
mode:
authorFelix Cheung <felixcheung_m@hotmail.com>2017-02-15 10:45:37 -0800
committerFelix Cheung <felixcheung@apache.org>2017-02-15 10:45:37 -0800
commit671bc08ed502815bfa2254c30d64149402acb0c7 (patch)
tree3edcf2548e8f58a6a27db9c16050a3ff1d8ae261 /R
parentc97f4e17de0ce39e8172a5a4ae81f1914816a358 (diff)
downloadspark-671bc08ed502815bfa2254c30d64149402acb0c7.tar.gz
spark-671bc08ed502815bfa2254c30d64149402acb0c7.tar.bz2
spark-671bc08ed502815bfa2254c30d64149402acb0c7.zip
[SPARK-19399][SPARKR] Add R coalesce API for DataFrame and Column
## What changes were proposed in this pull request? Add coalesce on DataFrame for down partitioning without shuffle and coalesce on Column ## How was this patch tested? manual, unit tests Author: Felix Cheung <felixcheung_m@hotmail.com> Closes #16739 from felixcheung/rcoalesce.
Diffstat (limited to 'R')
-rw-r--r--R/pkg/NAMESPACE1
-rw-r--r--R/pkg/R/DataFrame.R46
-rw-r--r--R/pkg/R/RDD.R4
-rw-r--r--R/pkg/R/functions.R26
-rw-r--r--R/pkg/R/generics.R9
-rw-r--r--R/pkg/inst/tests/testthat/test_rdd.R2
-rw-r--r--R/pkg/inst/tests/testthat/test_sparkSQL.R32
7 files changed, 106 insertions, 14 deletions
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 8b265006cb..81e19364ae 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -82,6 +82,7 @@ exportMethods("arrange",
"as.data.frame",
"attach",
"cache",
+ "coalesce",
"collect",
"colnames",
"colnames<-",
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 5bca4105fc..cf331bab47 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -678,14 +678,53 @@ setMethod("storageLevel",
storageLevelToString(callJMethod(x@sdf, "storageLevel"))
})
+#' Coalesce
+#'
+#' Returns a new SparkDataFrame that has exactly \code{numPartitions} partitions.
+#' This operation results in a narrow dependency, e.g. if you go from 1000 partitions to 100
+#' partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of
+#' the current partitions. If a larger number of partitions is requested, it will stay at the
+#' current number of partitions.
+#'
+#' However, if you're doing a drastic coalesce on a SparkDataFrame, e.g. to numPartitions = 1,
+#' this may result in your computation taking place on fewer nodes than
+#' you like (e.g. one node in the case of numPartitions = 1). To avoid this,
+#' call \code{repartition}. This will add a shuffle step, but means the
+#' current upstream partitions will be executed in parallel (per whatever
+#' the current partitioning is).
+#'
+#' @param numPartitions the number of partitions to use.
+#'
+#' @family SparkDataFrame functions
+#' @rdname coalesce
+#' @name coalesce
+#' @aliases coalesce,SparkDataFrame-method
+#' @seealso \link{repartition}
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' path <- "path/to/file.json"
+#' df <- read.json(path)
+#' newDF <- coalesce(df, 1L)
+#'}
+#' @note coalesce(SparkDataFrame) since 2.1.1
+setMethod("coalesce",
+ signature(x = "SparkDataFrame"),
+ function(x, numPartitions) {
+ stopifnot(is.numeric(numPartitions))
+ sdf <- callJMethod(x@sdf, "coalesce", numToInt(numPartitions))
+ dataFrame(sdf)
+ })
+
#' Repartition
#'
#' The following options for repartition are possible:
#' \itemize{
-#' \item{1.} {Return a new SparkDataFrame partitioned by
+#' \item{1.} {Return a new SparkDataFrame that has exactly \code{numPartitions}.}
+#' \item{2.} {Return a new SparkDataFrame hash partitioned by
#' the given columns into \code{numPartitions}.}
-#' \item{2.} {Return a new SparkDataFrame that has exactly \code{numPartitions}.}
-#' \item{3.} {Return a new SparkDataFrame partitioned by the given column(s),
+#' \item{3.} {Return a new SparkDataFrame hash partitioned by the given column(s),
#' using \code{spark.sql.shuffle.partitions} as number of partitions.}
#'}
#' @param x a SparkDataFrame.
@@ -697,6 +736,7 @@ setMethod("storageLevel",
#' @rdname repartition
#' @name repartition
#' @aliases repartition,SparkDataFrame-method
+#' @seealso \link{coalesce}
#' @export
#' @examples
#'\dontrun{
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R
index 91bab332c2..5667b9d788 100644
--- a/R/pkg/R/RDD.R
+++ b/R/pkg/R/RDD.R
@@ -1028,7 +1028,7 @@ setMethod("repartitionRDD",
signature(x = "RDD"),
function(x, numPartitions) {
if (!is.null(numPartitions) && is.numeric(numPartitions)) {
- coalesce(x, numPartitions, TRUE)
+ coalesceRDD(x, numPartitions, TRUE)
} else {
stop("Please, specify the number of partitions")
}
@@ -1049,7 +1049,7 @@ setMethod("repartitionRDD",
#' @rdname coalesce
#' @aliases coalesce,RDD
#' @noRd
-setMethod("coalesce",
+setMethod("coalesceRDD",
signature(x = "RDD", numPartitions = "numeric"),
function(x, numPartitions, shuffle = FALSE) {
numPartitions <- numToInt(numPartitions)
diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index 032cfecfc0..9e5084481f 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -286,6 +286,28 @@ setMethod("ceil",
column(jc)
})
+#' Returns the first column that is not NA
+#'
+#' Returns the first column that is not NA, or NA if all inputs are.
+#'
+#' @rdname coalesce
+#' @name coalesce
+#' @family normal_funcs
+#' @export
+#' @aliases coalesce,Column-method
+#' @examples \dontrun{coalesce(df$c, df$d, df$e)}
+#' @note coalesce(Column) since 2.1.1
+setMethod("coalesce",
+ signature(x = "Column"),
+ function(x, ...) {
+ jcols <- lapply(list(x, ...), function (x) {
+ stopifnot(class(x) == "Column")
+ x@jc
+ })
+ jc <- callJStatic("org.apache.spark.sql.functions", "coalesce", jcols)
+ column(jc)
+ })
+
#' Though scala functions has "col" function, we don't expose it in SparkR
#' because we don't want to conflict with the "col" function in the R base
#' package and we also have "column" function exported which is an alias of "col".
@@ -297,7 +319,7 @@ col <- function(x) {
#' Returns a Column based on the given column name
#'
#' Returns a Column based on the given column name.
-#
+#'
#' @param x Character column name.
#'
#' @rdname column
@@ -305,7 +327,7 @@ col <- function(x) {
#' @family normal_funcs
#' @export
#' @aliases column,character-method
-#' @examples \dontrun{column(df)}
+#' @examples \dontrun{column("name")}
#' @note column since 1.6.0
setMethod("column",
signature(x = "character"),
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 0d9a9968e2..68864e6fe1 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -28,7 +28,7 @@ setGeneric("cacheRDD", function(x) { standardGeneric("cacheRDD") })
# @rdname coalesce
# @seealso repartition
# @export
-setGeneric("coalesce", function(x, numPartitions, ...) { standardGeneric("coalesce") })
+setGeneric("coalesceRDD", function(x, numPartitions, ...) { standardGeneric("coalesceRDD") })
# @rdname checkpoint-methods
# @export
@@ -406,6 +406,13 @@ setGeneric("attach")
#' @export
setGeneric("cache", function(x) { standardGeneric("cache") })
+#' @rdname coalesce
+#' @param x a Column or a SparkDataFrame.
+#' @param ... additional argument(s). If \code{x} is a Column, additional Columns can be optionally
+#' provided.
+#' @export
+setGeneric("coalesce", function(x, ...) { standardGeneric("coalesce") })
+
#' @rdname collect
#' @export
setGeneric("collect", function(x, ...) { standardGeneric("collect") })
diff --git a/R/pkg/inst/tests/testthat/test_rdd.R b/R/pkg/inst/tests/testthat/test_rdd.R
index ceb31bd896..787ef51c50 100644
--- a/R/pkg/inst/tests/testthat/test_rdd.R
+++ b/R/pkg/inst/tests/testthat/test_rdd.R
@@ -315,7 +315,7 @@ test_that("repartition/coalesce on RDDs", {
expect_true(count >= 0 && count <= 4)
# coalesce
- r3 <- coalesce(rdd, 1)
+ r3 <- coalesceRDD(rdd, 1)
expect_equal(getNumPartitionsRDD(r3), 1L)
count <- length(collectPartition(r3, 0L))
expect_equal(count, 20)
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 1494ebb3de..199eb2057f 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -725,7 +725,7 @@ test_that("objectFile() works with row serialization", {
objectPath <- tempfile(pattern = "spark-test", fileext = ".tmp")
df <- read.json(jsonPath)
dfRDD <- toRDD(df)
- saveAsObjectFile(coalesce(dfRDD, 1L), objectPath)
+ saveAsObjectFile(coalesceRDD(dfRDD, 1L), objectPath)
objectIn <- objectFile(sc, objectPath)
expect_is(objectIn, "RDD")
@@ -1236,7 +1236,7 @@ test_that("column functions", {
c16 <- is.nan(c) + isnan(c) + isNaN(c)
c17 <- cov(c, c1) + cov("c", "c1") + covar_samp(c, c1) + covar_samp("c", "c1")
c18 <- covar_pop(c, c1) + covar_pop("c", "c1")
- c19 <- spark_partition_id()
+ c19 <- spark_partition_id() + coalesce(c) + coalesce(c1, c2, c3)
c20 <- to_timestamp(c) + to_timestamp(c, "yyyy") + to_date(c, "yyyy")
# Test if base::is.nan() is exposed
@@ -2491,15 +2491,18 @@ test_that("repartition by columns on DataFrame", {
("Please, specify the number of partitions and/or a column\\(s\\)", retError), TRUE)
# repartition by column and number of partitions
- actual <- repartition(df, 3L, col = df$"a")
+ actual <- repartition(df, 3, col = df$"a")
- # since we cannot access the number of partitions from dataframe, checking
- # that at least the dimensions are identical
+ # Checking that at least the dimensions are identical
expect_identical(dim(df), dim(actual))
+ expect_equal(getNumPartitions(actual), 3L)
# repartition by number of partitions
actual <- repartition(df, 13L)
expect_identical(dim(df), dim(actual))
+ expect_equal(getNumPartitions(actual), 13L)
+
+ expect_equal(getNumPartitions(coalesce(actual, 1L)), 1L)
# a test case with a column and dapply
schema <- structType(structField("a", "integer"), structField("avg", "double"))
@@ -2515,6 +2518,25 @@ test_that("repartition by columns on DataFrame", {
expect_equal(nrow(df1), 2)
})
+test_that("coalesce, repartition, numPartitions", {
+ df <- as.DataFrame(cars, numPartitions = 5)
+ expect_equal(getNumPartitions(df), 5)
+ expect_equal(getNumPartitions(coalesce(df, 3)), 3)
+ expect_equal(getNumPartitions(coalesce(df, 6)), 5)
+
+ df1 <- coalesce(df, 3)
+ expect_equal(getNumPartitions(df1), 3)
+ expect_equal(getNumPartitions(coalesce(df1, 6)), 5)
+ expect_equal(getNumPartitions(coalesce(df1, 4)), 4)
+ expect_equal(getNumPartitions(coalesce(df1, 2)), 2)
+
+ df2 <- repartition(df1, 10)
+ expect_equal(getNumPartitions(df2), 10)
+ expect_equal(getNumPartitions(coalesce(df2, 13)), 5)
+ expect_equal(getNumPartitions(coalesce(df2, 7)), 5)
+ expect_equal(getNumPartitions(coalesce(df2, 3)), 3)
+})
+
test_that("gapply() and gapplyCollect() on a DataFrame", {
df <- createDataFrame (
list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),