aboutsummaryrefslogtreecommitdiff
path: root/R/pkg/R/RDD.R
diff options
context:
space:
mode:
Diffstat (limited to 'R/pkg/R/RDD.R')
-rw-r--r--R/pkg/R/RDD.R55
1 files changed, 33 insertions, 22 deletions
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R
index 47945c2825..00c40c38ca 100644
--- a/R/pkg/R/RDD.R
+++ b/R/pkg/R/RDD.R
@@ -306,17 +306,28 @@ setMethod("checkpoint",
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10, 2L)
-#' numPartitions(rdd) # 2L
+#' getNumPartitions(rdd) # 2L
#'}
-#' @rdname numPartitions
+#' @rdname getNumPartitions
+#' @aliases getNumPartitions,RDD-method
+#' @noRd
+setMethod("getNumPartitions",
+ signature(x = "RDD"),
+ function(x) {
+ callJMethod(getJRDD(x), "getNumPartitions")
+ })
+
+#' Gets the number of partitions of an RDD, the same as getNumPartitions.
+#' But this function has been deprecated, please use getNumPartitions.
+#'
+#' @rdname getNumPartitions
#' @aliases numPartitions,RDD-method
#' @noRd
setMethod("numPartitions",
signature(x = "RDD"),
function(x) {
- jrdd <- getJRDD(x)
- partitions <- callJMethod(jrdd, "partitions")
- callJMethod(partitions, "size")
+ .Deprecated("getNumPartitions")
+ getNumPartitions(x)
})
#' Collect elements of an RDD
@@ -443,7 +454,7 @@ setMethod("countByValue",
signature(x = "RDD"),
function(x) {
ones <- lapply(x, function(item) { list(item, 1L) })
- collect(reduceByKey(ones, `+`, numPartitions(x)))
+ collect(reduceByKey(ones, `+`, getNumPartitions(x)))
})
#' Apply a function to all elements
@@ -759,7 +770,7 @@ setMethod("take",
resList <- list()
index <- -1
jrdd <- getJRDD(x)
- numPartitions <- numPartitions(x)
+ numPartitions <- getNumPartitions(x)
serializedModeRDD <- getSerializedMode(x)
# TODO(shivaram): Collect more than one partition based on size
@@ -823,7 +834,7 @@ setMethod("first",
#' @noRd
setMethod("distinct",
signature(x = "RDD"),
- function(x, numPartitions = SparkR:::numPartitions(x)) {
+ function(x, numPartitions = SparkR:::getNumPartitions(x)) {
identical.mapped <- lapply(x, function(x) { list(x, NULL) })
reduced <- reduceByKey(identical.mapped,
function(x, y) { x },
@@ -993,8 +1004,8 @@ setMethod("keyBy",
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(1, 2, 3, 4, 5, 6, 7), 4L)
-#' numPartitions(rdd) # 4
-#' numPartitions(repartition(rdd, 2L)) # 2
+#' getNumPartitions(rdd) # 4
+#' getNumPartitions(repartition(rdd, 2L)) # 2
#'}
#' @rdname repartition
#' @aliases repartition,RDD
@@ -1014,8 +1025,8 @@ setMethod("repartition",
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(1, 2, 3, 4, 5), 3L)
-#' numPartitions(rdd) # 3
-#' numPartitions(coalesce(rdd, 1L)) # 1
+#' getNumPartitions(rdd) # 3
+#' getNumPartitions(coalesce(rdd, 1L)) # 1
#'}
#' @rdname coalesce
#' @aliases coalesce,RDD
@@ -1024,7 +1035,7 @@ setMethod("coalesce",
signature(x = "RDD", numPartitions = "numeric"),
function(x, numPartitions, shuffle = FALSE) {
numPartitions <- numToInt(numPartitions)
- if (shuffle || numPartitions > SparkR:::numPartitions(x)) {
+ if (shuffle || numPartitions > SparkR:::getNumPartitions(x)) {
func <- function(partIndex, part) {
set.seed(partIndex) # partIndex as seed
start <- as.integer(base::sample(numPartitions, 1) - 1)
@@ -1112,7 +1123,7 @@ setMethod("saveAsTextFile",
#' @noRd
setMethod("sortBy",
signature(x = "RDD", func = "function"),
- function(x, func, ascending = TRUE, numPartitions = SparkR:::numPartitions(x)) {
+ function(x, func, ascending = TRUE, numPartitions = SparkR:::getNumPartitions(x)) {
values(sortByKey(keyBy(x, func), ascending, numPartitions))
})
@@ -1144,7 +1155,7 @@ takeOrderedElem <- function(x, num, ascending = TRUE) {
resList <- list()
index <- -1
jrdd <- getJRDD(newRdd)
- numPartitions <- numPartitions(newRdd)
+ numPartitions <- getNumPartitions(newRdd)
serializedModeRDD <- getSerializedMode(newRdd)
while (TRUE) {
@@ -1368,7 +1379,7 @@ setMethod("setName",
setMethod("zipWithUniqueId",
signature(x = "RDD"),
function(x) {
- n <- numPartitions(x)
+ n <- getNumPartitions(x)
partitionFunc <- function(partIndex, part) {
mapply(
@@ -1409,7 +1420,7 @@ setMethod("zipWithUniqueId",
setMethod("zipWithIndex",
signature(x = "RDD"),
function(x) {
- n <- numPartitions(x)
+ n <- getNumPartitions(x)
if (n > 1) {
nums <- collect(lapplyPartition(x,
function(part) {
@@ -1521,8 +1532,8 @@ setMethod("unionRDD",
setMethod("zipRDD",
signature(x = "RDD", other = "RDD"),
function(x, other) {
- n1 <- numPartitions(x)
- n2 <- numPartitions(other)
+ n1 <- getNumPartitions(x)
+ n2 <- getNumPartitions(other)
if (n1 != n2) {
stop("Can only zip RDDs which have the same number of partitions.")
}
@@ -1588,7 +1599,7 @@ setMethod("cartesian",
#' @noRd
setMethod("subtract",
signature(x = "RDD", other = "RDD"),
- function(x, other, numPartitions = SparkR:::numPartitions(x)) {
+ function(x, other, numPartitions = SparkR:::getNumPartitions(x)) {
mapFunction <- function(e) { list(e, NA) }
rdd1 <- map(x, mapFunction)
rdd2 <- map(other, mapFunction)
@@ -1620,7 +1631,7 @@ setMethod("subtract",
#' @noRd
setMethod("intersection",
signature(x = "RDD", other = "RDD"),
- function(x, other, numPartitions = SparkR:::numPartitions(x)) {
+ function(x, other, numPartitions = SparkR:::getNumPartitions(x)) {
rdd1 <- map(x, function(v) { list(v, NA) })
rdd2 <- map(other, function(v) { list(v, NA) })
@@ -1661,7 +1672,7 @@ setMethod("zipPartitions",
if (length(rrdds) == 1) {
return(rrdds[[1]])
}
- nPart <- sapply(rrdds, numPartitions)
+ nPart <- sapply(rrdds, getNumPartitions)
if (length(unique(nPart)) != 1) {
stop("Can only zipPartitions RDDs which have the same number of partitions.")
}