aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNarineK <narine.kokhlikyan@us.ibm.com>2016-05-05 12:00:55 -0700
committerDavies Liu <davies.liu@gmail.com>2016-05-05 12:00:55 -0700
commit22226fcc926f9d3b8aa7b47dcd9847021e6a6879 (patch)
tree5d3087226563265109c82c9130ff5de6b5eac8b2
parentac12b35d31ef1d1663511bf6ae826a9cc0278f20 (diff)
downloadspark-22226fcc926f9d3b8aa7b47dcd9847021e6a6879.tar.gz
spark-22226fcc926f9d3b8aa7b47dcd9847021e6a6879.tar.bz2
spark-22226fcc926f9d3b8aa7b47dcd9847021e6a6879.zip
[SPARK-15110] [SPARKR] Implement repartitionByColumn for SparkR DataFrames
## What changes were proposed in this pull request? Implement repartitionByColumn on DataFrame. This will allow us to run R functions on each partition identified by column groups with dapply() method. ## How was this patch tested? Unit tests Author: NarineK <narine.kokhlikyan@us.ibm.com> Closes #12887 from NarineK/repartitionByColumns.
-rw-r--r--R/pkg/R/DataFrame.R37
-rw-r--r--R/pkg/R/RDD.R8
-rw-r--r--R/pkg/R/generics.R2
-rw-r--r--R/pkg/inst/tests/testthat/test_sparkSQL.R36
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala5
5 files changed, 78 insertions, 10 deletions
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 9e30fa0dbf..fcf473ac7b 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -570,10 +570,17 @@ setMethod("unpersist",
#' Repartition
#'
-#' Return a new SparkDataFrame that has exactly numPartitions partitions.
-#'
+#' The following options for repartition are possible:
+#' \itemize{
+#' \item{"Option 1"} {Return a new SparkDataFrame partitioned by
+#' the given columns into `numPartitions`.}
+#' \item{"Option 2"} {Return a new SparkDataFrame that has exactly `numPartitions`.}
+#' \item{"Option 3"} {Return a new SparkDataFrame partitioned by the given column(s),
+#' using `spark.sql.shuffle.partitions` as number of partitions.}
+#'}
#' @param x A SparkDataFrame
#' @param numPartitions The number of partitions to use.
+#' @param col The column by which the partitioning will be performed.
#'
#' @family SparkDataFrame functions
#' @rdname repartition
@@ -586,11 +593,31 @@ setMethod("unpersist",
#' path <- "path/to/file.json"
#' df <- read.json(sqlContext, path)
#' newDF <- repartition(df, 2L)
+#' newDF <- repartition(df, numPartitions = 2L)
+#' newDF <- repartition(df, col = df$"col1", df$"col2")
+#' newDF <- repartition(df, 3L, col = df$"col1", df$"col2")
#'}
setMethod("repartition",
- signature(x = "SparkDataFrame", numPartitions = "numeric"),
- function(x, numPartitions) {
- sdf <- callJMethod(x@sdf, "repartition", numToInt(numPartitions))
+ signature(x = "SparkDataFrame"),
+ function(x, numPartitions = NULL, col = NULL, ...) {
+ if (!is.null(numPartitions) && is.numeric(numPartitions)) {
+ # number of partitions and columns both are specified
+ if (!is.null(col) && class(col) == "Column") {
+ cols <- list(col, ...)
+ jcol <- lapply(cols, function(c) { c@jc })
+ sdf <- callJMethod(x@sdf, "repartition", numToInt(numPartitions), jcol)
+ } else {
+ # only number of partitions is specified
+ sdf <- callJMethod(x@sdf, "repartition", numToInt(numPartitions))
+ }
+ } else if (!is.null(col) && class(col) == "Column") {
+ # only columns are specified
+ cols <- list(col, ...)
+ jcol <- lapply(cols, function(c) { c@jc })
+ sdf <- callJMethod(x@sdf, "repartition", jcol)
+ } else {
+ stop("Please, specify the number of partitions and/or a column(s)")
+ }
dataFrame(sdf)
})
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R
index 34d29ddbfd..f1badf4364 100644
--- a/R/pkg/R/RDD.R
+++ b/R/pkg/R/RDD.R
@@ -1023,9 +1023,13 @@ setMethod("keyBy",
#' @aliases repartition,RDD
#' @noRd
setMethod("repartition",
- signature(x = "RDD", numPartitions = "numeric"),
+ signature(x = "RDD"),
function(x, numPartitions) {
- coalesce(x, numPartitions, TRUE)
+ if (!is.null(numPartitions) && is.numeric(numPartitions)) {
+ coalesce(x, numPartitions, TRUE)
+ } else {
+ stop("Please, specify the number of partitions")
+ }
})
#' Return a new RDD that is reduced into numPartitions partitions.
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index f936ea6039..3db1ac0766 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -167,7 +167,7 @@ setGeneric("reduce", function(x, func) { standardGeneric("reduce") })
# @rdname repartition
# @seealso coalesce
# @export
-setGeneric("repartition", function(x, numPartitions) { standardGeneric("repartition") })
+setGeneric("repartition", function(x, ...) { standardGeneric("repartition") })
# @rdname sampleRDD
# @export
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 081f7b1663..3b6a27c3b8 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -2082,6 +2082,42 @@ test_that("dapply() on a DataFrame", {
expect_identical(expected, result)
})
+test_that("repartition by columns on DataFrame", {
+ df <- createDataFrame (
+ sqlContext,
+ list(list(1L, 1, "1", 0.1), list(1L, 2, "2", 0.2), list(3L, 3, "3", 0.3)),
+ c("a", "b", "c", "d"))
+
+ # no column and number of partitions specified
+ retError <- tryCatch(repartition(df), error = function(e) e)
+ expect_equal(grepl
+ ("Please, specify the number of partitions and/or a column\\(s\\)", retError), TRUE)
+
+ # repartition by column and number of partitions
+ actual <- repartition(df, 3L, col = df$"a")
+
+ # since we cannot access the number of partitions from dataframe, checking
+ # that at least the dimensions are identical
+ expect_identical(dim(df), dim(actual))
+
+ # repartition by number of partitions
+ actual <- repartition(df, 13L)
+ expect_identical(dim(df), dim(actual))
+
+ # a test case with a column and dapply
+ schema <- structType(structField("a", "integer"), structField("avg", "double"))
+ df <- repartition(df, col = df$"a")
+ df1 <- dapply(
+ df,
+ function(x) {
+ y <- (data.frame(x$a[1], mean(x$b)))
+ },
+ schema)
+
+ # Number of partitions is equal to 2
+ expect_equal(nrow(df1), 2)
+})
+
unlink(parquetPath)
unlink(jsonPath)
unlink(jsonPathNa)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index c77b13832c..dd73fb8dad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2178,8 +2178,9 @@ class Dataset[T] private[sql](
}
/**
- * Returns a new [[Dataset]] partitioned by the given partitioning expressions preserving
- * the existing number of partitions. The resulting Datasetis hash partitioned.
+ * Returns a new [[Dataset]] partitioned by the given partitioning expressions, using
+ * `spark.sql.shuffle.partitions` as number of partitions.
+ * The resulting Dataset is hash partitioned.
*
* This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL).
*