aboutsummaryrefslogtreecommitdiff
path: root/R
diff options
context:
space:
mode:
authorYanbo Liang <ybliang8@gmail.com>2015-12-05 16:39:01 -0800
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2015-12-05 16:39:01 -0800
commit6979edf4e1a93caafa8d286692097dd377d7616d (patch)
treec2e98565ec3a986e62b5c77b229c59dc7747888a /R
parent895b6c474735d7e0a38283f92292daa5c35875ee (diff)
downloadspark-6979edf4e1a93caafa8d286692097dd377d7616d.tar.gz
spark-6979edf4e1a93caafa8d286692097dd377d7616d.tar.bz2
spark-6979edf4e1a93caafa8d286692097dd377d7616d.zip
[SPARK-12115][SPARKR] Change numPartitions() to getNumPartitions() to be consistent with Scala/Python
Change ```numPartitions()``` to ```getNumPartitions()``` to be consistent with Scala/Python. <del>Note: If we can not catch up with 1.6 release, it will be breaking change for 1.7 that we also need to explain in release note.<del> cc sun-rui felixcheung shivaram Author: Yanbo Liang <ybliang8@gmail.com> Closes #10123 from yanboliang/spark-12115.
Diffstat (limited to 'R')
-rw-r--r--R/pkg/R/RDD.R55
-rw-r--r--R/pkg/R/generics.R6
-rw-r--r--R/pkg/R/pairRDD.R4
-rw-r--r--R/pkg/inst/tests/test_rdd.R10
4 files changed, 45 insertions, 30 deletions
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R
index 47945c2825..00c40c38ca 100644
--- a/R/pkg/R/RDD.R
+++ b/R/pkg/R/RDD.R
@@ -306,17 +306,28 @@ setMethod("checkpoint",
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10, 2L)
-#' numPartitions(rdd) # 2L
+#' getNumPartitions(rdd) # 2L
#'}
-#' @rdname numPartitions
+#' @rdname getNumPartitions
+#' @aliases getNumPartitions,RDD-method
+#' @noRd
+setMethod("getNumPartitions",
+ signature(x = "RDD"),
+ function(x) {
+ callJMethod(getJRDD(x), "getNumPartitions")
+ })
+
+#' Gets the number of partitions of an RDD, the same as getNumPartitions.
+#' But this function has been deprecated, please use getNumPartitions.
+#'
+#' @rdname getNumPartitions
#' @aliases numPartitions,RDD-method
#' @noRd
setMethod("numPartitions",
signature(x = "RDD"),
function(x) {
- jrdd <- getJRDD(x)
- partitions <- callJMethod(jrdd, "partitions")
- callJMethod(partitions, "size")
+ .Deprecated("getNumPartitions")
+ getNumPartitions(x)
})
#' Collect elements of an RDD
@@ -443,7 +454,7 @@ setMethod("countByValue",
signature(x = "RDD"),
function(x) {
ones <- lapply(x, function(item) { list(item, 1L) })
- collect(reduceByKey(ones, `+`, numPartitions(x)))
+ collect(reduceByKey(ones, `+`, getNumPartitions(x)))
})
#' Apply a function to all elements
@@ -759,7 +770,7 @@ setMethod("take",
resList <- list()
index <- -1
jrdd <- getJRDD(x)
- numPartitions <- numPartitions(x)
+ numPartitions <- getNumPartitions(x)
serializedModeRDD <- getSerializedMode(x)
# TODO(shivaram): Collect more than one partition based on size
@@ -823,7 +834,7 @@ setMethod("first",
#' @noRd
setMethod("distinct",
signature(x = "RDD"),
- function(x, numPartitions = SparkR:::numPartitions(x)) {
+ function(x, numPartitions = SparkR:::getNumPartitions(x)) {
identical.mapped <- lapply(x, function(x) { list(x, NULL) })
reduced <- reduceByKey(identical.mapped,
function(x, y) { x },
@@ -993,8 +1004,8 @@ setMethod("keyBy",
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(1, 2, 3, 4, 5, 6, 7), 4L)
-#' numPartitions(rdd) # 4
-#' numPartitions(repartition(rdd, 2L)) # 2
+#' getNumPartitions(rdd) # 4
+#' getNumPartitions(repartition(rdd, 2L)) # 2
#'}
#' @rdname repartition
#' @aliases repartition,RDD
@@ -1014,8 +1025,8 @@ setMethod("repartition",
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(1, 2, 3, 4, 5), 3L)
-#' numPartitions(rdd) # 3
-#' numPartitions(coalesce(rdd, 1L)) # 1
+#' getNumPartitions(rdd) # 3
+#' getNumPartitions(coalesce(rdd, 1L)) # 1
#'}
#' @rdname coalesce
#' @aliases coalesce,RDD
@@ -1024,7 +1035,7 @@ setMethod("coalesce",
signature(x = "RDD", numPartitions = "numeric"),
function(x, numPartitions, shuffle = FALSE) {
numPartitions <- numToInt(numPartitions)
- if (shuffle || numPartitions > SparkR:::numPartitions(x)) {
+ if (shuffle || numPartitions > SparkR:::getNumPartitions(x)) {
func <- function(partIndex, part) {
set.seed(partIndex) # partIndex as seed
start <- as.integer(base::sample(numPartitions, 1) - 1)
@@ -1112,7 +1123,7 @@ setMethod("saveAsTextFile",
#' @noRd
setMethod("sortBy",
signature(x = "RDD", func = "function"),
- function(x, func, ascending = TRUE, numPartitions = SparkR:::numPartitions(x)) {
+ function(x, func, ascending = TRUE, numPartitions = SparkR:::getNumPartitions(x)) {
values(sortByKey(keyBy(x, func), ascending, numPartitions))
})
@@ -1144,7 +1155,7 @@ takeOrderedElem <- function(x, num, ascending = TRUE) {
resList <- list()
index <- -1
jrdd <- getJRDD(newRdd)
- numPartitions <- numPartitions(newRdd)
+ numPartitions <- getNumPartitions(newRdd)
serializedModeRDD <- getSerializedMode(newRdd)
while (TRUE) {
@@ -1368,7 +1379,7 @@ setMethod("setName",
setMethod("zipWithUniqueId",
signature(x = "RDD"),
function(x) {
- n <- numPartitions(x)
+ n <- getNumPartitions(x)
partitionFunc <- function(partIndex, part) {
mapply(
@@ -1409,7 +1420,7 @@ setMethod("zipWithUniqueId",
setMethod("zipWithIndex",
signature(x = "RDD"),
function(x) {
- n <- numPartitions(x)
+ n <- getNumPartitions(x)
if (n > 1) {
nums <- collect(lapplyPartition(x,
function(part) {
@@ -1521,8 +1532,8 @@ setMethod("unionRDD",
setMethod("zipRDD",
signature(x = "RDD", other = "RDD"),
function(x, other) {
- n1 <- numPartitions(x)
- n2 <- numPartitions(other)
+ n1 <- getNumPartitions(x)
+ n2 <- getNumPartitions(other)
if (n1 != n2) {
stop("Can only zip RDDs which have the same number of partitions.")
}
@@ -1588,7 +1599,7 @@ setMethod("cartesian",
#' @noRd
setMethod("subtract",
signature(x = "RDD", other = "RDD"),
- function(x, other, numPartitions = SparkR:::numPartitions(x)) {
+ function(x, other, numPartitions = SparkR:::getNumPartitions(x)) {
mapFunction <- function(e) { list(e, NA) }
rdd1 <- map(x, mapFunction)
rdd2 <- map(other, mapFunction)
@@ -1620,7 +1631,7 @@ setMethod("subtract",
#' @noRd
setMethod("intersection",
signature(x = "RDD", other = "RDD"),
- function(x, other, numPartitions = SparkR:::numPartitions(x)) {
+ function(x, other, numPartitions = SparkR:::getNumPartitions(x)) {
rdd1 <- map(x, function(v) { list(v, NA) })
rdd2 <- map(other, function(v) { list(v, NA) })
@@ -1661,7 +1672,7 @@ setMethod("zipPartitions",
if (length(rrdds) == 1) {
return(rrdds[[1]])
}
- nPart <- sapply(rrdds, numPartitions)
+ nPart <- sapply(rrdds, getNumPartitions)
if (length(unique(nPart)) != 1) {
stop("Can only zipPartitions RDDs which have the same number of partitions.")
}
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index acfd4841e1..29dd11f41f 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -133,7 +133,11 @@ setGeneric("sumRDD", function(x) { standardGeneric("sumRDD") })
# @export
setGeneric("name", function(x) { standardGeneric("name") })
-# @rdname numPartitions
+# @rdname getNumPartitions
+# @export
+setGeneric("getNumPartitions", function(x) { standardGeneric("getNumPartitions") })
+
+# @rdname getNumPartitions
# @export
setGeneric("numPartitions", function(x) { standardGeneric("numPartitions") })
diff --git a/R/pkg/R/pairRDD.R b/R/pkg/R/pairRDD.R
index 991bea4d20..334c11d2f8 100644
--- a/R/pkg/R/pairRDD.R
+++ b/R/pkg/R/pairRDD.R
@@ -750,7 +750,7 @@ setMethod("cogroup",
#' @noRd
setMethod("sortByKey",
signature(x = "RDD"),
- function(x, ascending = TRUE, numPartitions = SparkR:::numPartitions(x)) {
+ function(x, ascending = TRUE, numPartitions = SparkR:::getNumPartitions(x)) {
rangeBounds <- list()
if (numPartitions > 1) {
@@ -818,7 +818,7 @@ setMethod("sortByKey",
#' @noRd
setMethod("subtractByKey",
signature(x = "RDD", other = "RDD"),
- function(x, other, numPartitions = SparkR:::numPartitions(x)) {
+ function(x, other, numPartitions = SparkR:::getNumPartitions(x)) {
filterFunction <- function(elem) {
iters <- elem[[2]]
(length(iters[[1]]) > 0) && (length(iters[[2]]) == 0)
diff --git a/R/pkg/inst/tests/test_rdd.R b/R/pkg/inst/tests/test_rdd.R
index 71aed2bb9d..7423b4f2be 100644
--- a/R/pkg/inst/tests/test_rdd.R
+++ b/R/pkg/inst/tests/test_rdd.R
@@ -28,8 +28,8 @@ intPairs <- list(list(1L, -1), list(2L, 100), list(2L, 1), list(1L, 200))
intRdd <- parallelize(sc, intPairs, 2L)
test_that("get number of partitions in RDD", {
- expect_equal(numPartitions(rdd), 2)
- expect_equal(numPartitions(intRdd), 2)
+ expect_equal(getNumPartitions(rdd), 2)
+ expect_equal(getNumPartitions(intRdd), 2)
})
test_that("first on RDD", {
@@ -304,18 +304,18 @@ test_that("repartition/coalesce on RDDs", {
# repartition
r1 <- repartition(rdd, 2)
- expect_equal(numPartitions(r1), 2L)
+ expect_equal(getNumPartitions(r1), 2L)
count <- length(collectPartition(r1, 0L))
expect_true(count >= 8 && count <= 12)
r2 <- repartition(rdd, 6)
- expect_equal(numPartitions(r2), 6L)
+ expect_equal(getNumPartitions(r2), 6L)
count <- length(collectPartition(r2, 0L))
expect_true(count >= 0 && count <= 4)
# coalesce
r3 <- coalesce(rdd, 1)
- expect_equal(numPartitions(r3), 1L)
+ expect_equal(getNumPartitions(r3), 1L)
count <- length(collectPartition(r3, 0L))
expect_equal(count, 20)
})