aboutsummaryrefslogtreecommitdiff
path: root/R
diff options
context:
space:
mode:
authorhlin09 <hlin09pu@gmail.com>2015-04-27 15:04:37 -0700
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2015-04-27 15:04:37 -0700
commitca9f4ebb8e510e521bf4df0331375ddb385fb9d2 (patch)
tree768b88696324e9e65fdfaaccacea92ee82bb250e /R
parentef82bddc11d1aea42e22d2f85613a869cbe9a990 (diff)
downloadspark-ca9f4ebb8e510e521bf4df0331375ddb385fb9d2.tar.gz
spark-ca9f4ebb8e510e521bf4df0331375ddb385fb9d2.tar.bz2
spark-ca9f4ebb8e510e521bf4df0331375ddb385fb9d2.zip
[SPARK-6991] [SPARKR] Adds support for zipPartitions.
Author: hlin09 <hlin09pu@gmail.com> Closes #5568 from hlin09/zipPartitions and squashes the following commits: 12c08a5 [hlin09] Fix comments d2d32db [hlin09] Merge branch 'master' into zipPartitions ec56d2f [hlin09] Fix test. 27655d3 [hlin09] Adds support for zipPartitions.
Diffstat (limited to 'R')
-rw-r--r--R/pkg/NAMESPACE1
-rw-r--r--R/pkg/R/RDD.R46
-rw-r--r--R/pkg/R/generics.R5
-rw-r--r--R/pkg/inst/tests/test_binary_function.R33
4 files changed, 85 insertions, 0 deletions
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 8028364386..e077eace74 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -71,6 +71,7 @@ exportMethods(
"unpersist",
"value",
"values",
+ "zipPartitions",
"zipRDD",
"zipWithIndex",
"zipWithUniqueId"
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R
index f90c26b253..a3a0421a07 100644
--- a/R/pkg/R/RDD.R
+++ b/R/pkg/R/RDD.R
@@ -1595,3 +1595,49 @@ setMethod("intersection",
keys(filterRDD(cogroup(rdd1, rdd2, numPartitions = numPartitions), filterFunction))
})
+
+#' Zips an RDD's partitions with one (or more) RDD(s).
+#' Same as zipPartitions in Spark.
+#'
+#' @param ... RDDs to be zipped.
+#' @param func A function to transform zipped partitions.
+#' @return A new RDD by applying a function to the zipped partitions.
+#' Assumes that all the RDDs have the *same number of partitions*, but
+#' does *not* require them to have the same number of elements in each partition.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
+#' rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
+#' rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
+#' collect(zipPartitions(rdd1, rdd2, rdd3,
+#' func = function(x, y, z) { list(list(x, y, z))} ))
+#' # list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6)))
+#'}
+#' @rdname zipRDD
+#' @aliases zipPartitions,RDD
+setMethod("zipPartitions",
+ "RDD",
+ function(..., func) {
+ rrdds <- list(...)
+ if (length(rrdds) == 1) {
+ return(rrdds[[1]])
+ }
+ nPart <- sapply(rrdds, numPartitions)
+ if (length(unique(nPart)) != 1) {
+ stop("Can only zipPartitions RDDs which have the same number of partitions.")
+ }
+
+ rrdds <- lapply(rrdds, function(rdd) {
+ mapPartitionsWithIndex(rdd, function(partIndex, part) {
+ print(length(part))
+ list(list(partIndex, part))
+ })
+ })
+ union.rdd <- Reduce(unionRDD, rrdds)
+ zipped.rdd <- values(groupByKey(union.rdd, numPartitions = nPart[1]))
+ res <- mapPartitions(zipped.rdd, function(plist) {
+ do.call(func, plist[[1]])
+ })
+ res
+ })
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 34dbe84051..e88729387e 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -217,6 +217,11 @@ setGeneric("unpersist", function(x, ...) { standardGeneric("unpersist") })
#' @export
setGeneric("zipRDD", function(x, other) { standardGeneric("zipRDD") })
+#' @rdname zipRDD
+#' @export
+setGeneric("zipPartitions", function(..., func) { standardGeneric("zipPartitions") },
+ signature = "...")
+
#' @rdname zipWithIndex
#' @seealso zipWithUniqueId
#' @export
diff --git a/R/pkg/inst/tests/test_binary_function.R b/R/pkg/inst/tests/test_binary_function.R
index c15553ba28..6785a7bdae 100644
--- a/R/pkg/inst/tests/test_binary_function.R
+++ b/R/pkg/inst/tests/test_binary_function.R
@@ -66,3 +66,36 @@ test_that("cogroup on two RDDs", {
expect_equal(sortKeyValueList(actual),
sortKeyValueList(expected))
})
+
+test_that("zipPartitions() on RDDs", {
+ rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
+ rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
+ rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
+ actual <- collect(zipPartitions(rdd1, rdd2, rdd3,
+ func = function(x, y, z) { list(list(x, y, z))} ))
+ expect_equal(actual,
+ list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6))))
+
+ mockFile = c("Spark is pretty.", "Spark is awesome.")
+ fileName <- tempfile(pattern="spark-test", fileext=".tmp")
+ writeLines(mockFile, fileName)
+
+ rdd <- textFile(sc, fileName, 1)
+ actual <- collect(zipPartitions(rdd, rdd,
+ func = function(x, y) { list(paste(x, y, sep = "\n")) }))
+ expected <- list(paste(mockFile, mockFile, sep = "\n"))
+ expect_equal(actual, expected)
+
+ rdd1 <- parallelize(sc, 0:1, 1)
+ actual <- collect(zipPartitions(rdd1, rdd,
+ func = function(x, y) { list(x + nchar(y)) }))
+ expected <- list(0:1 + nchar(mockFile))
+ expect_equal(actual, expected)
+
+ rdd <- map(rdd, function(x) { x })
+ actual <- collect(zipPartitions(rdd, rdd1,
+ func = function(x, y) { list(y + nchar(x)) }))
+ expect_equal(actual, expected)
+
+ unlink(fileName)
+})