aboutsummaryrefslogtreecommitdiff
path: root/R
diff options
context:
space:
mode:
authorSun Rui <rui.sun@intel.com>2015-04-24 11:00:19 -0700
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2015-04-24 11:00:19 -0700
commitebb77b2aff085e71906b5de9d266ded89051af82 (patch)
treef36e4927c100bb07955b1bf31cd18aa0e7619d63 /R
parent6e57d57b32ba2aa0514692074897b5edd34e0dd6 (diff)
downloadspark-ebb77b2aff085e71906b5de9d266ded89051af82.tar.gz
spark-ebb77b2aff085e71906b5de9d266ded89051af82.tar.bz2
spark-ebb77b2aff085e71906b5de9d266ded89051af82.zip
[SPARK-7033] [SPARKR] Clean usage of split. Use partition instead where applicable.
Author: Sun Rui <rui.sun@intel.com> Closes #5628 from sun-rui/SPARK-7033 and squashes the following commits: 046bc9e [Sun Rui] Clean split usage in tests. d531c86 [Sun Rui] [SPARK-7033][SPARKR] Clean usage of split. Use partition instead where applicable.
Diffstat (limited to 'R')
-rw-r--r--R/pkg/R/RDD.R36
-rw-r--r--R/pkg/R/context.R20
-rw-r--r--R/pkg/R/pairRDD.R8
-rw-r--r--R/pkg/R/utils.R2
-rw-r--r--R/pkg/inst/tests/test_rdd.R12
5 files changed, 39 insertions, 39 deletions
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R
index 128431334c..cc09efb1e5 100644
--- a/R/pkg/R/RDD.R
+++ b/R/pkg/R/RDD.R
@@ -91,8 +91,8 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
# NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD
# prev_serializedMode is used during the delayed computation of JRDD in getJRDD
} else {
- pipelinedFunc <- function(split, iterator) {
- func(split, prev@func(split, iterator))
+ pipelinedFunc <- function(partIndex, part) {
+ func(partIndex, prev@func(partIndex, part))
}
.Object@func <- cleanClosure(pipelinedFunc)
.Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
@@ -306,7 +306,7 @@ setMethod("numPartitions",
signature(x = "RDD"),
function(x) {
jrdd <- getJRDD(x)
- partitions <- callJMethod(jrdd, "splits")
+ partitions <- callJMethod(jrdd, "partitions")
callJMethod(partitions, "size")
})
@@ -452,8 +452,8 @@ setMethod("countByValue",
setMethod("lapply",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
- func <- function(split, iterator) {
- lapply(iterator, FUN)
+ func <- function(partIndex, part) {
+ lapply(part, FUN)
}
lapplyPartitionsWithIndex(X, func)
})
@@ -538,8 +538,8 @@ setMethod("mapPartitions",
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10, 5L)
-#' prod <- lapplyPartitionsWithIndex(rdd, function(split, part) {
-#' split * Reduce("+", part) })
+#' prod <- lapplyPartitionsWithIndex(rdd, function(partIndex, part) {
+#' partIndex * Reduce("+", part) })
#' collect(prod, flatten = FALSE) # 0, 7, 22, 45, 76
#'}
#' @rdname lapplyPartitionsWithIndex
@@ -813,7 +813,7 @@ setMethod("distinct",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, 1:10) # ensure each num is in its own split
+#' rdd <- parallelize(sc, 1:10)
#' collect(sampleRDD(rdd, FALSE, 0.5, 1618L)) # ~5 distinct elements
#' collect(sampleRDD(rdd, TRUE, 0.5, 9L)) # ~5 elements possibly with duplicates
#'}
@@ -825,14 +825,14 @@ setMethod("sampleRDD",
function(x, withReplacement, fraction, seed) {
# The sampler: takes a partition and returns its sampled version.
- samplingFunc <- function(split, part) {
+ samplingFunc <- function(partIndex, part) {
set.seed(seed)
res <- vector("list", length(part))
len <- 0
# Discards some random values to ensure each partition has a
# different random seed.
- runif(split)
+ runif(partIndex)
for (elem in part) {
if (withReplacement) {
@@ -989,8 +989,8 @@ setMethod("coalesce",
function(x, numPartitions, shuffle = FALSE) {
numPartitions <- numToInt(numPartitions)
if (shuffle || numPartitions > SparkR::numPartitions(x)) {
- func <- function(s, part) {
- set.seed(s) # split as seed
+ func <- function(partIndex, part) {
+ set.seed(partIndex) # partIndex as seed
start <- as.integer(sample(numPartitions, 1) - 1)
lapply(seq_along(part),
function(i) {
@@ -1035,7 +1035,7 @@ setMethod("saveAsObjectFile",
#' Save this RDD as a text file, using string representations of elements.
#'
#' @param x The RDD to save
-#' @param path The directory where the splits of the text file are saved
+#' @param path The directory where the partitions of the text file are saved
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
@@ -1335,10 +1335,10 @@ setMethod("zipWithUniqueId",
function(x) {
n <- numPartitions(x)
- partitionFunc <- function(split, part) {
+ partitionFunc <- function(partIndex, part) {
mapply(
function(item, index) {
- list(item, (index - 1) * n + split)
+ list(item, (index - 1) * n + partIndex)
},
part,
seq_along(part),
@@ -1382,11 +1382,11 @@ setMethod("zipWithIndex",
startIndices <- Reduce("+", nums, accumulate = TRUE)
}
- partitionFunc <- function(split, part) {
- if (split == 0) {
+ partitionFunc <- function(partIndex, part) {
+ if (partIndex == 0) {
startIndex <- 0
} else {
- startIndex <- startIndices[[split]]
+ startIndex <- startIndices[[partIndex]]
}
mapply(
diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R
index ebbb8fba10..b4845b6948 100644
--- a/R/pkg/R/context.R
+++ b/R/pkg/R/context.R
@@ -17,12 +17,12 @@
# context.R: SparkContext driven functions
-getMinSplits <- function(sc, minSplits) {
- if (is.null(minSplits)) {
+getMinPartitions <- function(sc, minPartitions) {
+ if (is.null(minPartitions)) {
defaultParallelism <- callJMethod(sc, "defaultParallelism")
- minSplits <- min(defaultParallelism, 2)
+ minPartitions <- min(defaultParallelism, 2)
}
- as.integer(minSplits)
+ as.integer(minPartitions)
}
#' Create an RDD from a text file.
@@ -33,7 +33,7 @@ getMinSplits <- function(sc, minSplits) {
#'
#' @param sc SparkContext to use
#' @param path Path of file to read. A vector of multiple paths is allowed.
-#' @param minSplits Minimum number of splits to be created. If NULL, the default
+#' @param minPartitions Minimum number of partitions to be created. If NULL, the default
#' value is chosen based on available parallelism.
#' @return RDD where each item is of type \code{character}
#' @export
@@ -42,13 +42,13 @@ getMinSplits <- function(sc, minSplits) {
#' sc <- sparkR.init()
#' lines <- textFile(sc, "myfile.txt")
#'}
-textFile <- function(sc, path, minSplits = NULL) {
+textFile <- function(sc, path, minPartitions = NULL) {
# Allow the user to have a more flexible definiton of the text file path
path <- suppressWarnings(normalizePath(path))
#' Convert a string vector of paths to a string containing comma separated paths
path <- paste(path, collapse = ",")
- jrdd <- callJMethod(sc, "textFile", path, getMinSplits(sc, minSplits))
+ jrdd <- callJMethod(sc, "textFile", path, getMinPartitions(sc, minPartitions))
# jrdd is of type JavaRDD[String]
RDD(jrdd, "string")
}
@@ -60,7 +60,7 @@ textFile <- function(sc, path, minSplits = NULL) {
#'
#' @param sc SparkContext to use
#' @param path Path of file to read. A vector of multiple paths is allowed.
-#' @param minSplits Minimum number of splits to be created. If NULL, the default
+#' @param minPartitions Minimum number of partitions to be created. If NULL, the default
#' value is chosen based on available parallelism.
#' @return RDD containing serialized R objects.
#' @seealso saveAsObjectFile
@@ -70,13 +70,13 @@ textFile <- function(sc, path, minSplits = NULL) {
#' sc <- sparkR.init()
#' rdd <- objectFile(sc, "myfile")
#'}
-objectFile <- function(sc, path, minSplits = NULL) {
+objectFile <- function(sc, path, minPartitions = NULL) {
# Allow the user to have a more flexible definiton of the text file path
path <- suppressWarnings(normalizePath(path))
#' Convert a string vector of paths to a string containing comma separated paths
path <- paste(path, collapse = ",")
- jrdd <- callJMethod(sc, "objectFile", path, getMinSplits(sc, minSplits))
+ jrdd <- callJMethod(sc, "objectFile", path, getMinPartitions(sc, minPartitions))
# Assume the RDD contains serialized R objects.
RDD(jrdd, "byte")
}
diff --git a/R/pkg/R/pairRDD.R b/R/pkg/R/pairRDD.R
index 13efebc11c..f99b474ff8 100644
--- a/R/pkg/R/pairRDD.R
+++ b/R/pkg/R/pairRDD.R
@@ -206,8 +206,8 @@ setMethod("partitionBy",
get(name, .broadcastNames) })
jrdd <- getJRDD(x)
- # We create a PairwiseRRDD that extends RDD[(Array[Byte],
- # Array[Byte])], where the key is the hashed split, the value is
+ # We create a PairwiseRRDD that extends RDD[(Int, Array[Byte])],
+ # where the key is the target partition number, the value is
# the content (key-val pairs).
pairwiseRRDD <- newJObject("org.apache.spark.api.r.PairwiseRRDD",
callJMethod(jrdd, "rdd"),
@@ -866,8 +866,8 @@ setMethod("sampleByKey",
}
# The sampler: takes a partition and returns its sampled version.
- samplingFunc <- function(split, part) {
- set.seed(bitwXor(seed, split))
+ samplingFunc <- function(partIndex, part) {
+ set.seed(bitwXor(seed, partIndex))
res <- vector("list", length(part))
len <- 0
diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R
index 23305d3c67..0e7b7bd5a5 100644
--- a/R/pkg/R/utils.R
+++ b/R/pkg/R/utils.R
@@ -501,7 +501,7 @@ appendPartitionLengths <- function(x, other) {
# A result RDD.
mergePartitions <- function(rdd, zip) {
serializerMode <- getSerializedMode(rdd)
- partitionFunc <- function(split, part) {
+ partitionFunc <- function(partIndex, part) {
len <- length(part)
if (len > 0) {
if (serializerMode == "byte") {
diff --git a/R/pkg/inst/tests/test_rdd.R b/R/pkg/inst/tests/test_rdd.R
index 3ba7d17163..d55af93e3e 100644
--- a/R/pkg/inst/tests/test_rdd.R
+++ b/R/pkg/inst/tests/test_rdd.R
@@ -105,8 +105,8 @@ test_that("several transformations on RDD (a benchmark on PipelinedRDD)", {
rdd2 <- rdd
for (i in 1:12)
rdd2 <- lapplyPartitionsWithIndex(
- rdd2, function(split, part) {
- part <- as.list(unlist(part) * split + i)
+ rdd2, function(partIndex, part) {
+ part <- as.list(unlist(part) * partIndex + i)
})
rdd2 <- lapply(rdd2, function(x) x + x)
actual <- collect(rdd2)
@@ -121,8 +121,8 @@ test_that("PipelinedRDD support actions: cache(), persist(), unpersist(), checkp
# PipelinedRDD
rdd2 <- lapplyPartitionsWithIndex(
rdd2,
- function(split, part) {
- part <- as.list(unlist(part) * split)
+ function(partIndex, part) {
+ part <- as.list(unlist(part) * partIndex)
})
cache(rdd2)
@@ -174,13 +174,13 @@ test_that("lapply with dependency", {
})
test_that("lapplyPartitionsWithIndex on RDDs", {
- func <- function(splitIndex, part) { list(splitIndex, Reduce("+", part)) }
+ func <- function(partIndex, part) { list(partIndex, Reduce("+", part)) }
actual <- collect(lapplyPartitionsWithIndex(rdd, func), flatten = FALSE)
expect_equal(actual, list(list(0, 15), list(1, 40)))
pairsRDD <- parallelize(sc, list(list(1, 2), list(3, 4), list(4, 8)), 1L)
partitionByParity <- function(key) { if (key %% 2 == 1) 0 else 1 }
- mkTup <- function(splitIndex, part) { list(splitIndex, part) }
+ mkTup <- function(partIndex, part) { list(partIndex, part) }
actual <- collect(lapplyPartitionsWithIndex(
partitionBy(pairsRDD, 2L, partitionByParity),
mkTup),