aboutsummaryrefslogtreecommitdiff
path: root/R/pkg/R/pairRDD.R
diff options
context:
space:
mode:
Diffstat (limited to 'R/pkg/R/pairRDD.R')
-rw-r--r--R/pkg/R/pairRDD.R910
1 files changed, 462 insertions, 448 deletions
diff --git a/R/pkg/R/pairRDD.R b/R/pkg/R/pairRDD.R
index 199c3fd6ab..991bea4d20 100644
--- a/R/pkg/R/pairRDD.R
+++ b/R/pkg/R/pairRDD.R
@@ -21,23 +21,24 @@ NULL
############ Actions and Transformations ############
-# Look up elements of a key in an RDD
-#
-# @description
-# \code{lookup} returns a list of values in this RDD for key key.
-#
-# @param x The RDD to collect
-# @param key The key to look up for
-# @return a list of values in this RDD for key key
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# pairs <- list(c(1, 1), c(2, 2), c(1, 3))
-# rdd <- parallelize(sc, pairs)
-# lookup(rdd, 1) # list(1, 3)
-#}
-# @rdname lookup
-# @aliases lookup,RDD-method
+#' Look up elements of a key in an RDD
+#'
+#' @description
+#' \code{lookup} returns a list of values in this RDD for key key.
+#'
+#' @param x The RDD to collect
+#' @param key The key to look up for
+#' @return a list of values in this RDD for key key
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' pairs <- list(c(1, 1), c(2, 2), c(1, 3))
+#' rdd <- parallelize(sc, pairs)
+#' lookup(rdd, 1) # list(1, 3)
+#'}
+#' @rdname lookup
+#' @aliases lookup,RDD-method
+#' @noRd
setMethod("lookup",
signature(x = "RDD", key = "ANY"),
function(x, key) {
@@ -49,21 +50,22 @@ setMethod("lookup",
collect(valsRDD)
})
-# Count the number of elements for each key, and return the result to the
-# master as lists of (key, count) pairs.
-#
-# Same as countByKey in Spark.
-#
-# @param x The RDD to count keys.
-# @return list of (key, count) pairs, where count is number of each key in rdd.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, list(c("a", 1), c("b", 1), c("a", 1)))
-# countByKey(rdd) # ("a", 2L), ("b", 1L)
-#}
-# @rdname countByKey
-# @aliases countByKey,RDD-method
+#' Count the number of elements for each key, and return the result to the
+#' master as lists of (key, count) pairs.
+#'
+#' Same as countByKey in Spark.
+#'
+#' @param x The RDD to count keys.
+#' @return list of (key, count) pairs, where count is number of each key in rdd.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(c("a", 1), c("b", 1), c("a", 1)))
+#' countByKey(rdd) # ("a", 2L), ("b", 1L)
+#'}
+#' @rdname countByKey
+#' @aliases countByKey,RDD-method
+#' @noRd
setMethod("countByKey",
signature(x = "RDD"),
function(x) {
@@ -71,17 +73,18 @@ setMethod("countByKey",
countByValue(keys)
})
-# Return an RDD with the keys of each tuple.
-#
-# @param x The RDD from which the keys of each tuple is returned.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, list(list(1, 2), list(3, 4)))
-# collect(keys(rdd)) # list(1, 3)
-#}
-# @rdname keys
-# @aliases keys,RDD
+#' Return an RDD with the keys of each tuple.
+#'
+#' @param x The RDD from which the keys of each tuple is returned.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(list(1, 2), list(3, 4)))
+#' collect(keys(rdd)) # list(1, 3)
+#'}
+#' @rdname keys
+#' @aliases keys,RDD
+#' @noRd
setMethod("keys",
signature(x = "RDD"),
function(x) {
@@ -91,17 +94,18 @@ setMethod("keys",
lapply(x, func)
})
-# Return an RDD with the values of each tuple.
-#
-# @param x The RDD from which the values of each tuple is returned.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, list(list(1, 2), list(3, 4)))
-# collect(values(rdd)) # list(2, 4)
-#}
-# @rdname values
-# @aliases values,RDD
+#' Return an RDD with the values of each tuple.
+#'
+#' @param x The RDD from which the values of each tuple is returned.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(list(1, 2), list(3, 4)))
+#' collect(values(rdd)) # list(2, 4)
+#'}
+#' @rdname values
+#' @aliases values,RDD
+#' @noRd
setMethod("values",
signature(x = "RDD"),
function(x) {
@@ -111,23 +115,24 @@ setMethod("values",
lapply(x, func)
})
-# Applies a function to all values of the elements, without modifying the keys.
-#
-# The same as `mapValues()' in Spark.
-#
-# @param X The RDD to apply the transformation.
-# @param FUN the transformation to apply on the value of each element.
-# @return a new RDD created by the transformation.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, 1:10)
-# makePairs <- lapply(rdd, function(x) { list(x, x) })
-# collect(mapValues(makePairs, function(x) { x * 2) })
-# Output: list(list(1,2), list(2,4), list(3,6), ...)
-#}
-# @rdname mapValues
-# @aliases mapValues,RDD,function-method
+#' Applies a function to all values of the elements, without modifying the keys.
+#'
+#' The same as `mapValues()' in Spark.
+#'
+#' @param X The RDD to apply the transformation.
+#' @param FUN the transformation to apply on the value of each element.
+#' @return a new RDD created by the transformation.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' makePairs <- lapply(rdd, function(x) { list(x, x) })
+#' collect(mapValues(makePairs, function(x) { x * 2) })
+#' Output: list(list(1,2), list(2,4), list(3,6), ...)
+#'}
+#' @rdname mapValues
+#' @aliases mapValues,RDD,function-method
+#' @noRd
setMethod("mapValues",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
@@ -137,23 +142,24 @@ setMethod("mapValues",
lapply(X, func)
})
-# Pass each value in the key-value pair RDD through a flatMap function without
-# changing the keys; this also retains the original RDD's partitioning.
-#
-# The same as 'flatMapValues()' in Spark.
-#
-# @param X The RDD to apply the transformation.
-# @param FUN the transformation to apply on the value of each element.
-# @return a new RDD created by the transformation.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, list(list(1, c(1,2)), list(2, c(3,4))))
-# collect(flatMapValues(rdd, function(x) { x }))
-# Output: list(list(1,1), list(1,2), list(2,3), list(2,4))
-#}
-# @rdname flatMapValues
-# @aliases flatMapValues,RDD,function-method
+#' Pass each value in the key-value pair RDD through a flatMap function without
+#' changing the keys; this also retains the original RDD's partitioning.
+#'
+#' The same as 'flatMapValues()' in Spark.
+#'
+#' @param X The RDD to apply the transformation.
+#' @param FUN the transformation to apply on the value of each element.
+#' @return a new RDD created by the transformation.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(list(1, c(1,2)), list(2, c(3,4))))
+#' collect(flatMapValues(rdd, function(x) { x }))
+#' Output: list(list(1,1), list(1,2), list(2,3), list(2,4))
+#'}
+#' @rdname flatMapValues
+#' @aliases flatMapValues,RDD,function-method
+#' @noRd
setMethod("flatMapValues",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
@@ -165,38 +171,34 @@ setMethod("flatMapValues",
############ Shuffle Functions ############
-# Partition an RDD by key
-#
-# This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
-# For each element of this RDD, the partitioner is used to compute a hash
-# function and the RDD is partitioned using this hash value.
-#
-# @param x The RDD to partition. Should be an RDD where each element is
-# list(K, V) or c(K, V).
-# @param numPartitions Number of partitions to create.
-# @param ... Other optional arguments to partitionBy.
-#
-# @param partitionFunc The partition function to use. Uses a default hashCode
-# function if not provided
-# @return An RDD partitioned using the specified partitioner.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
-# rdd <- parallelize(sc, pairs)
-# parts <- partitionBy(rdd, 2L)
-# collectPartition(parts, 0L) # First partition should contain list(1, 2) and list(1, 4)
-#}
-# @rdname partitionBy
-# @aliases partitionBy,RDD,integer-method
+#' Partition an RDD by key
+#'
+#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
+#' For each element of this RDD, the partitioner is used to compute a hash
+#' function and the RDD is partitioned using this hash value.
+#'
+#' @param x The RDD to partition. Should be an RDD where each element is
+#' list(K, V) or c(K, V).
+#' @param numPartitions Number of partitions to create.
+#' @param ... Other optional arguments to partitionBy.
+#'
+#' @param partitionFunc The partition function to use. Uses a default hashCode
+#' function if not provided
+#' @return An RDD partitioned using the specified partitioner.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
+#' rdd <- parallelize(sc, pairs)
+#' parts <- partitionBy(rdd, 2L)
+#' collectPartition(parts, 0L) # First partition should contain list(1, 2) and list(1, 4)
+#'}
+#' @rdname partitionBy
+#' @aliases partitionBy,RDD,integer-method
+#' @noRd
setMethod("partitionBy",
signature(x = "RDD", numPartitions = "numeric"),
function(x, numPartitions, partitionFunc = hashCode) {
-
- #if (missing(partitionFunc)) {
- # partitionFunc <- hashCode
- #}
-
partitionFunc <- cleanClosure(partitionFunc)
serializedHashFuncBytes <- serialize(partitionFunc, connection = NULL)
@@ -233,27 +235,28 @@ setMethod("partitionBy",
RDD(r, serializedMode = "byte")
})
-# Group values by key
-#
-# This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
-# and group values for each key in the RDD into a single sequence.
-#
-# @param x The RDD to group. Should be an RDD where each element is
-# list(K, V) or c(K, V).
-# @param numPartitions Number of partitions to create.
-# @return An RDD where each element is list(K, list(V))
-# @seealso reduceByKey
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
-# rdd <- parallelize(sc, pairs)
-# parts <- groupByKey(rdd, 2L)
-# grouped <- collect(parts)
-# grouped[[1]] # Should be a list(1, list(2, 4))
-#}
-# @rdname groupByKey
-# @aliases groupByKey,RDD,integer-method
+#' Group values by key
+#'
+#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
+#' and group values for each key in the RDD into a single sequence.
+#'
+#' @param x The RDD to group. Should be an RDD where each element is
+#' list(K, V) or c(K, V).
+#' @param numPartitions Number of partitions to create.
+#' @return An RDD where each element is list(K, list(V))
+#' @seealso reduceByKey
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
+#' rdd <- parallelize(sc, pairs)
+#' parts <- groupByKey(rdd, 2L)
+#' grouped <- collect(parts)
+#' grouped[[1]] # Should be a list(1, list(2, 4))
+#'}
+#' @rdname groupByKey
+#' @aliases groupByKey,RDD,integer-method
+#' @noRd
setMethod("groupByKey",
signature(x = "RDD", numPartitions = "numeric"),
function(x, numPartitions) {
@@ -291,28 +294,29 @@ setMethod("groupByKey",
lapplyPartition(shuffled, groupVals)
})
-# Merge values by key
-#
-# This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
-# and merges the values for each key using an associative reduce function.
-#
-# @param x The RDD to reduce by key. Should be an RDD where each element is
-# list(K, V) or c(K, V).
-# @param combineFunc The associative reduce function to use.
-# @param numPartitions Number of partitions to create.
-# @return An RDD where each element is list(K, V') where V' is the merged
-# value
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
-# rdd <- parallelize(sc, pairs)
-# parts <- reduceByKey(rdd, "+", 2L)
-# reduced <- collect(parts)
-# reduced[[1]] # Should be a list(1, 6)
-#}
-# @rdname reduceByKey
-# @aliases reduceByKey,RDD,integer-method
+#' Merge values by key
+#'
+#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
+#' and merges the values for each key using an associative reduce function.
+#'
+#' @param x The RDD to reduce by key. Should be an RDD where each element is
+#' list(K, V) or c(K, V).
+#' @param combineFunc The associative reduce function to use.
+#' @param numPartitions Number of partitions to create.
+#' @return An RDD where each element is list(K, V') where V' is the merged
+#' value
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
+#' rdd <- parallelize(sc, pairs)
+#' parts <- reduceByKey(rdd, "+", 2L)
+#' reduced <- collect(parts)
+#' reduced[[1]] # Should be a list(1, 6)
+#'}
+#' @rdname reduceByKey
+#' @aliases reduceByKey,RDD,integer-method
+#' @noRd
setMethod("reduceByKey",
signature(x = "RDD", combineFunc = "ANY", numPartitions = "numeric"),
function(x, combineFunc, numPartitions) {
@@ -332,27 +336,28 @@ setMethod("reduceByKey",
lapplyPartition(shuffled, reduceVals)
})
-# Merge values by key locally
-#
-# This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
-# and merges the values for each key using an associative reduce function, but return the
-# results immediately to the driver as an R list.
-#
-# @param x The RDD to reduce by key. Should be an RDD where each element is
-# list(K, V) or c(K, V).
-# @param combineFunc The associative reduce function to use.
-# @return A list of elements of type list(K, V') where V' is the merged value for each key
-# @seealso reduceByKey
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
-# rdd <- parallelize(sc, pairs)
-# reduced <- reduceByKeyLocally(rdd, "+")
-# reduced # list(list(1, 6), list(1.1, 3))
-#}
-# @rdname reduceByKeyLocally
-# @aliases reduceByKeyLocally,RDD,integer-method
+#' Merge values by key locally
+#'
+#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
+#' and merges the values for each key using an associative reduce function, but return the
+#' results immediately to the driver as an R list.
+#'
+#' @param x The RDD to reduce by key. Should be an RDD where each element is
+#' list(K, V) or c(K, V).
+#' @param combineFunc The associative reduce function to use.
+#' @return A list of elements of type list(K, V') where V' is the merged value for each key
+#' @seealso reduceByKey
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
+#' rdd <- parallelize(sc, pairs)
+#' reduced <- reduceByKeyLocally(rdd, "+")
+#' reduced # list(list(1, 6), list(1.1, 3))
+#'}
+#' @rdname reduceByKeyLocally
+#' @aliases reduceByKeyLocally,RDD,integer-method
+#' @noRd
setMethod("reduceByKeyLocally",
signature(x = "RDD", combineFunc = "ANY"),
function(x, combineFunc) {
@@ -384,41 +389,40 @@ setMethod("reduceByKeyLocally",
convertEnvsToList(merged[[1]], merged[[2]])
})
-# Combine values by key
-#
-# Generic function to combine the elements for each key using a custom set of
-# aggregation functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)],
-# for a "combined type" C. Note that V and C can be different -- for example, one
-# might group an RDD of type (Int, Int) into an RDD of type (Int, Seq[Int]).
-
-# Users provide three functions:
-# \itemize{
-# \item createCombiner, which turns a V into a C (e.g., creates a one-element list)
-# \item mergeValue, to merge a V into a C (e.g., adds it to the end of a list) -
-# \item mergeCombiners, to combine two C's into a single one (e.g., concatentates
-# two lists).
-# }
-#
-# @param x The RDD to combine. Should be an RDD where each element is
-# list(K, V) or c(K, V).
-# @param createCombiner Create a combiner (C) given a value (V)
-# @param mergeValue Merge the given value (V) with an existing combiner (C)
-# @param mergeCombiners Merge two combiners and return a new combiner
-# @param numPartitions Number of partitions to create.
-# @return An RDD where each element is list(K, C) where C is the combined type
-#
-# @seealso groupByKey, reduceByKey
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
-# rdd <- parallelize(sc, pairs)
-# parts <- combineByKey(rdd, function(x) { x }, "+", "+", 2L)
-# combined <- collect(parts)
-# combined[[1]] # Should be a list(1, 6)
-#}
-# @rdname combineByKey
-# @aliases combineByKey,RDD,ANY,ANY,ANY,integer-method
+#' Combine values by key
+#'
+#' Generic function to combine the elements for each key using a custom set of
+#' aggregation functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)],
+#' for a "combined type" C. Note that V and C can be different -- for example, one
+#' might group an RDD of type (Int, Int) into an RDD of type (Int, Seq[Int]).
+#' Users provide three functions:
+#' \itemize{
+#' \item createCombiner, which turns a V into a C (e.g., creates a one-element list)
+#' \item mergeValue, to merge a V into a C (e.g., adds it to the end of a list) -
+#' \item mergeCombiners, to combine two C's into a single one (e.g., concatentates
+#' two lists).
+#' }
+#'
+#' @param x The RDD to combine. Should be an RDD where each element is
+#' list(K, V) or c(K, V).
+#' @param createCombiner Create a combiner (C) given a value (V)
+#' @param mergeValue Merge the given value (V) with an existing combiner (C)
+#' @param mergeCombiners Merge two combiners and return a new combiner
+#' @param numPartitions Number of partitions to create.
+#' @return An RDD where each element is list(K, C) where C is the combined type
+#' @seealso groupByKey, reduceByKey
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
+#' rdd <- parallelize(sc, pairs)
+#' parts <- combineByKey(rdd, function(x) { x }, "+", "+", 2L)
+#' combined <- collect(parts)
+#' combined[[1]] # Should be a list(1, 6)
+#'}
+#' @rdname combineByKey
+#' @aliases combineByKey,RDD,ANY,ANY,ANY,integer-method
+#' @noRd
setMethod("combineByKey",
signature(x = "RDD", createCombiner = "ANY", mergeValue = "ANY",
mergeCombiners = "ANY", numPartitions = "numeric"),
@@ -450,36 +454,37 @@ setMethod("combineByKey",
lapplyPartition(shuffled, mergeAfterShuffle)
})
-# Aggregate a pair RDD by each key.
-#
-# Aggregate the values of each key in an RDD, using given combine functions
-# and a neutral "zero value". This function can return a different result type,
-# U, than the type of the values in this RDD, V. Thus, we need one operation
-# for merging a V into a U and one operation for merging two U's, The former
-# operation is used for merging values within a partition, and the latter is
-# used for merging values between partitions. To avoid memory allocation, both
-# of these functions are allowed to modify and return their first argument
-# instead of creating a new U.
-#
-# @param x An RDD.
-# @param zeroValue A neutral "zero value".
-# @param seqOp A function to aggregate the values of each key. It may return
-# a different result type from the type of the values.
-# @param combOp A function to aggregate results of seqOp.
-# @return An RDD containing the aggregation result.
-# @seealso foldByKey, combineByKey
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
-# zeroValue <- list(0, 0)
-# seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
-# combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
-# aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
-# # list(list(1, list(3, 2)), list(2, list(7, 2)))
-#}
-# @rdname aggregateByKey
-# @aliases aggregateByKey,RDD,ANY,ANY,ANY,integer-method
+#' Aggregate a pair RDD by each key.
+#'
+#' Aggregate the values of each key in an RDD, using given combine functions
+#' and a neutral "zero value". This function can return a different result type,
+#' U, than the type of the values in this RDD, V. Thus, we need one operation
+#' for merging a V into a U and one operation for merging two U's, The former
+#' operation is used for merging values within a partition, and the latter is
+#' used for merging values between partitions. To avoid memory allocation, both
+#' of these functions are allowed to modify and return their first argument
+#' instead of creating a new U.
+#'
+#' @param x An RDD.
+#' @param zeroValue A neutral "zero value".
+#' @param seqOp A function to aggregate the values of each key. It may return
+#' a different result type from the type of the values.
+#' @param combOp A function to aggregate results of seqOp.
+#' @return An RDD containing the aggregation result.
+#' @seealso foldByKey, combineByKey
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
+#' zeroValue <- list(0, 0)
+#' seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
+#' combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
+#' aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
+#' # list(list(1, list(3, 2)), list(2, list(7, 2)))
+#'}
+#' @rdname aggregateByKey
+#' @aliases aggregateByKey,RDD,ANY,ANY,ANY,integer-method
+#' @noRd
setMethod("aggregateByKey",
signature(x = "RDD", zeroValue = "ANY", seqOp = "ANY",
combOp = "ANY", numPartitions = "numeric"),
@@ -491,26 +496,27 @@ setMethod("aggregateByKey",
combineByKey(x, createCombiner, seqOp, combOp, numPartitions)
})
-# Fold a pair RDD by each key.
-#
-# Aggregate the values of each key in an RDD, using an associative function "func"
-# and a neutral "zero value" which may be added to the result an arbitrary
-# number of times, and must not change the result (e.g., 0 for addition, or
-# 1 for multiplication.).
-#
-# @param x An RDD.
-# @param zeroValue A neutral "zero value".
-# @param func An associative function for folding values of each key.
-# @return An RDD containing the aggregation result.
-# @seealso aggregateByKey, combineByKey
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
-# foldByKey(rdd, 0, "+", 2L) # list(list(1, 3), list(2, 7))
-#}
-# @rdname foldByKey
-# @aliases foldByKey,RDD,ANY,ANY,integer-method
+#' Fold a pair RDD by each key.
+#'
+#' Aggregate the values of each key in an RDD, using an associative function "func"
+#' and a neutral "zero value" which may be added to the result an arbitrary
+#' number of times, and must not change the result (e.g., 0 for addition, or
+#' 1 for multiplication.).
+#'
+#' @param x An RDD.
+#' @param zeroValue A neutral "zero value".
+#' @param func An associative function for folding values of each key.
+#' @return An RDD containing the aggregation result.
+#' @seealso aggregateByKey, combineByKey
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
+#' foldByKey(rdd, 0, "+", 2L) # list(list(1, 3), list(2, 7))
+#'}
+#' @rdname foldByKey
+#' @aliases foldByKey,RDD,ANY,ANY,integer-method
+#' @noRd
setMethod("foldByKey",
signature(x = "RDD", zeroValue = "ANY",
func = "ANY", numPartitions = "numeric"),
@@ -520,28 +526,29 @@ setMethod("foldByKey",
############ Binary Functions #############
-# Join two RDDs
-#
-# @description
-# \code{join} This function joins two RDDs where every element is of the form list(K, V).
-# The key types of the two RDDs should be the same.
-#
-# @param x An RDD to be joined. Should be an RDD where each element is
-# list(K, V).
-# @param y An RDD to be joined. Should be an RDD where each element is
-# list(K, V).
-# @param numPartitions Number of partitions to create.
-# @return a new RDD containing all pairs of elements with matching keys in
-# two input RDDs.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
-# rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
-# join(rdd1, rdd2, 2L) # list(list(1, list(1, 2)), list(1, list(1, 3))
-#}
-# @rdname join-methods
-# @aliases join,RDD,RDD-method
+#' Join two RDDs
+#'
+#' @description
+#' \code{join} This function joins two RDDs where every element is of the form list(K, V).
+#' The key types of the two RDDs should be the same.
+#'
+#' @param x An RDD to be joined. Should be an RDD where each element is
+#' list(K, V).
+#' @param y An RDD to be joined. Should be an RDD where each element is
+#' list(K, V).
+#' @param numPartitions Number of partitions to create.
+#' @return a new RDD containing all pairs of elements with matching keys in
+#' two input RDDs.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
+#' rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
+#' join(rdd1, rdd2, 2L) # list(list(1, list(1, 2)), list(1, list(1, 3))
+#'}
+#' @rdname join-methods
+#' @aliases join,RDD,RDD-method
+#' @noRd
setMethod("join",
signature(x = "RDD", y = "RDD"),
function(x, y, numPartitions) {
@@ -556,30 +563,31 @@ setMethod("join",
doJoin)
})
-# Left outer join two RDDs
-#
-# @description
-# \code{leftouterjoin} This function left-outer-joins two RDDs where every element is of
-# the form list(K, V). The key types of the two RDDs should be the same.
-#
-# @param x An RDD to be joined. Should be an RDD where each element is
-# list(K, V).
-# @param y An RDD to be joined. Should be an RDD where each element is
-# list(K, V).
-# @param numPartitions Number of partitions to create.
-# @return For each element (k, v) in x, the resulting RDD will either contain
-# all pairs (k, (v, w)) for (k, w) in rdd2, or the pair (k, (v, NULL))
-# if no elements in rdd2 have key k.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
-# rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
-# leftOuterJoin(rdd1, rdd2, 2L)
-# # list(list(1, list(1, 2)), list(1, list(1, 3)), list(2, list(4, NULL)))
-#}
-# @rdname join-methods
-# @aliases leftOuterJoin,RDD,RDD-method
+#' Left outer join two RDDs
+#'
+#' @description
+#' \code{leftouterjoin} This function left-outer-joins two RDDs where every element is of
+#' the form list(K, V). The key types of the two RDDs should be the same.
+#'
+#' @param x An RDD to be joined. Should be an RDD where each element is
+#' list(K, V).
+#' @param y An RDD to be joined. Should be an RDD where each element is
+#' list(K, V).
+#' @param numPartitions Number of partitions to create.
+#' @return For each element (k, v) in x, the resulting RDD will either contain
+#' all pairs (k, (v, w)) for (k, w) in rdd2, or the pair (k, (v, NULL))
+#' if no elements in rdd2 have key k.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
+#' rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
+#' leftOuterJoin(rdd1, rdd2, 2L)
+#' # list(list(1, list(1, 2)), list(1, list(1, 3)), list(2, list(4, NULL)))
+#'}
+#' @rdname join-methods
+#' @aliases leftOuterJoin,RDD,RDD-method
+#' @noRd
setMethod("leftOuterJoin",
signature(x = "RDD", y = "RDD", numPartitions = "numeric"),
function(x, y, numPartitions) {
@@ -593,30 +601,31 @@ setMethod("leftOuterJoin",
joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin)
})
-# Right outer join two RDDs
-#
-# @description
-# \code{rightouterjoin} This function right-outer-joins two RDDs where every element is of
-# the form list(K, V). The key types of the two RDDs should be the same.
-#
-# @param x An RDD to be joined. Should be an RDD where each element is
-# list(K, V).
-# @param y An RDD to be joined. Should be an RDD where each element is
-# list(K, V).
-# @param numPartitions Number of partitions to create.
-# @return For each element (k, w) in y, the resulting RDD will either contain
-# all pairs (k, (v, w)) for (k, v) in x, or the pair (k, (NULL, w))
-# if no elements in x have key k.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3)))
-# rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
-# rightOuterJoin(rdd1, rdd2, 2L)
-# # list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4)))
-#}
-# @rdname join-methods
-# @aliases rightOuterJoin,RDD,RDD-method
+#' Right outer join two RDDs
+#'
+#' @description
+#' \code{rightouterjoin} This function right-outer-joins two RDDs where every element is of
+#' the form list(K, V). The key types of the two RDDs should be the same.
+#'
+#' @param x An RDD to be joined. Should be an RDD where each element is
+#' list(K, V).
+#' @param y An RDD to be joined. Should be an RDD where each element is
+#' list(K, V).
+#' @param numPartitions Number of partitions to create.
+#' @return For each element (k, w) in y, the resulting RDD will either contain
+#' all pairs (k, (v, w)) for (k, v) in x, or the pair (k, (NULL, w))
+#' if no elements in x have key k.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3)))
+#' rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
+#' rightOuterJoin(rdd1, rdd2, 2L)
+#' # list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4)))
+#'}
+#' @rdname join-methods
+#' @aliases rightOuterJoin,RDD,RDD-method
+#' @noRd
setMethod("rightOuterJoin",
signature(x = "RDD", y = "RDD", numPartitions = "numeric"),
function(x, y, numPartitions) {
@@ -630,33 +639,34 @@ setMethod("rightOuterJoin",
joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin)
})
-# Full outer join two RDDs
-#
-# @description
-# \code{fullouterjoin} This function full-outer-joins two RDDs where every element is of
-# the form list(K, V). The key types of the two RDDs should be the same.
-#
-# @param x An RDD to be joined. Should be an RDD where each element is
-# list(K, V).
-# @param y An RDD to be joined. Should be an RDD where each element is
-# list(K, V).
-# @param numPartitions Number of partitions to create.
-# @return For each element (k, v) in x and (k, w) in y, the resulting RDD
-# will contain all pairs (k, (v, w)) for both (k, v) in x and
-# (k, w) in y, or the pair (k, (NULL, w))/(k, (v, NULL)) if no elements
-# in x/y have key k.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3), list(3, 3)))
-# rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
-# fullOuterJoin(rdd1, rdd2, 2L) # list(list(1, list(2, 1)),
-# # list(1, list(3, 1)),
-# # list(2, list(NULL, 4)))
-# # list(3, list(3, NULL)),
-#}
-# @rdname join-methods
-# @aliases fullOuterJoin,RDD,RDD-method
+#' Full outer join two RDDs
+#'
+#' @description
+#' \code{fullouterjoin} This function full-outer-joins two RDDs where every element is of
+#' the form list(K, V). The key types of the two RDDs should be the same.
+#'
+#' @param x An RDD to be joined. Should be an RDD where each element is
+#' list(K, V).
+#' @param y An RDD to be joined. Should be an RDD where each element is
+#' list(K, V).
+#' @param numPartitions Number of partitions to create.
+#' @return For each element (k, v) in x and (k, w) in y, the resulting RDD
+#' will contain all pairs (k, (v, w)) for both (k, v) in x and
+#' (k, w) in y, or the pair (k, (NULL, w))/(k, (v, NULL)) if no elements
+#' in x/y have key k.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3), list(3, 3)))
+#' rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
+#' fullOuterJoin(rdd1, rdd2, 2L) # list(list(1, list(2, 1)),
+#' # list(1, list(3, 1)),
+#' # list(2, list(NULL, 4)))
+#' # list(3, list(3, NULL)),
+#'}
+#' @rdname join-methods
+#' @aliases fullOuterJoin,RDD,RDD-method
+#' @noRd
setMethod("fullOuterJoin",
signature(x = "RDD", y = "RDD", numPartitions = "numeric"),
function(x, y, numPartitions) {
@@ -670,23 +680,24 @@ setMethod("fullOuterJoin",
joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin)
})
-# For each key k in several RDDs, return a resulting RDD that
-# whose values are a list of values for the key in all RDDs.
-#
-# @param ... Several RDDs.
-# @param numPartitions Number of partitions to create.
-# @return a new RDD containing all pairs of elements with values in a list
-# in all RDDs.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
-# rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
-# cogroup(rdd1, rdd2, numPartitions = 2L)
-# # list(list(1, list(1, list(2, 3))), list(2, list(list(4), list()))
-#}
-# @rdname cogroup
-# @aliases cogroup,RDD-method
+#' For each key k in several RDDs, return a resulting RDD that
+#' whose values are a list of values for the key in all RDDs.
+#'
+#' @param ... Several RDDs.
+#' @param numPartitions Number of partitions to create.
+#' @return a new RDD containing all pairs of elements with values in a list
+#' in all RDDs.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
+#' rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
+#' cogroup(rdd1, rdd2, numPartitions = 2L)
+#' # list(list(1, list(1, list(2, 3))), list(2, list(list(4), list()))
+#'}
+#' @rdname cogroup
+#' @aliases cogroup,RDD-method
+#' @noRd
setMethod("cogroup",
"RDD",
function(..., numPartitions) {
@@ -722,20 +733,21 @@ setMethod("cogroup",
group.func)
})
-# Sort a (k, v) pair RDD by k.
-#
-# @param x A (k, v) pair RDD to be sorted.
-# @param ascending A flag to indicate whether the sorting is ascending or descending.
-# @param numPartitions Number of partitions to create.
-# @return An RDD where all (k, v) pair elements are sorted.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, list(list(3, 1), list(2, 2), list(1, 3)))
-# collect(sortByKey(rdd)) # list (list(1, 3), list(2, 2), list(3, 1))
-#}
-# @rdname sortByKey
-# @aliases sortByKey,RDD,RDD-method
+#' Sort a (k, v) pair RDD by k.
+#'
+#' @param x A (k, v) pair RDD to be sorted.
+#' @param ascending A flag to indicate whether the sorting is ascending or descending.
+#' @param numPartitions Number of partitions to create.
+#' @return An RDD where all (k, v) pair elements are sorted.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(list(3, 1), list(2, 2), list(1, 3)))
+#' collect(sortByKey(rdd)) # list (list(1, 3), list(2, 2), list(3, 1))
+#'}
+#' @rdname sortByKey
+#' @aliases sortByKey,RDD,RDD-method
+#' @noRd
setMethod("sortByKey",
signature(x = "RDD"),
function(x, ascending = TRUE, numPartitions = SparkR:::numPartitions(x)) {
@@ -784,25 +796,26 @@ setMethod("sortByKey",
lapplyPartition(newRDD, partitionFunc)
})
-# Subtract a pair RDD with another pair RDD.
-#
-# Return an RDD with the pairs from x whose keys are not in other.
-#
-# @param x An RDD.
-# @param other An RDD.
-# @param numPartitions Number of the partitions in the result RDD.
-# @return An RDD with the pairs from x whose keys are not in other.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd1 <- parallelize(sc, list(list("a", 1), list("b", 4),
-# list("b", 5), list("a", 2)))
-# rdd2 <- parallelize(sc, list(list("a", 3), list("c", 1)))
-# collect(subtractByKey(rdd1, rdd2))
-# # list(list("b", 4), list("b", 5))
-#}
-# @rdname subtractByKey
-# @aliases subtractByKey,RDD
+#' Subtract a pair RDD with another pair RDD.
+#'
+#' Return an RDD with the pairs from x whose keys are not in other.
+#'
+#' @param x An RDD.
+#' @param other An RDD.
+#' @param numPartitions Number of the partitions in the result RDD.
+#' @return An RDD with the pairs from x whose keys are not in other.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd1 <- parallelize(sc, list(list("a", 1), list("b", 4),
+#' list("b", 5), list("a", 2)))
+#' rdd2 <- parallelize(sc, list(list("a", 3), list("c", 1)))
+#' collect(subtractByKey(rdd1, rdd2))
+#' # list(list("b", 4), list("b", 5))
+#'}
+#' @rdname subtractByKey
+#' @aliases subtractByKey,RDD
+#' @noRd
setMethod("subtractByKey",
signature(x = "RDD", other = "RDD"),
function(x, other, numPartitions = SparkR:::numPartitions(x)) {
@@ -818,41 +831,42 @@ setMethod("subtractByKey",
function (v) { v[[1]] })
})
-# Return a subset of this RDD sampled by key.
-#
-# @description
-# \code{sampleByKey} Create a sample of this RDD using variable sampling rates
-# for different keys as specified by fractions, a key to sampling rate map.
-#
-# @param x The RDD to sample elements by key, where each element is
-# list(K, V) or c(K, V).
-# @param withReplacement Sampling with replacement or not
-# @param fraction The (rough) sample target fraction
-# @param seed Randomness seed value
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, 1:3000)
-# pairs <- lapply(rdd, function(x) { if (x %% 3 == 0) list("a", x)
-# else { if (x %% 3 == 1) list("b", x) else list("c", x) }})
-# fractions <- list(a = 0.2, b = 0.1, c = 0.3)
-# sample <- sampleByKey(pairs, FALSE, fractions, 1618L)
-# 100 < length(lookup(sample, "a")) && 300 > length(lookup(sample, "a")) # TRUE
-# 50 < length(lookup(sample, "b")) && 150 > length(lookup(sample, "b")) # TRUE
-# 200 < length(lookup(sample, "c")) && 400 > length(lookup(sample, "c")) # TRUE
-# lookup(sample, "a")[which.min(lookup(sample, "a"))] >= 0 # TRUE
-# lookup(sample, "a")[which.max(lookup(sample, "a"))] <= 2000 # TRUE
-# lookup(sample, "b")[which.min(lookup(sample, "b"))] >= 0 # TRUE
-# lookup(sample, "b")[which.max(lookup(sample, "b"))] <= 2000 # TRUE
-# lookup(sample, "c")[which.min(lookup(sample, "c"))] >= 0 # TRUE
-# lookup(sample, "c")[which.max(lookup(sample, "c"))] <= 2000 # TRUE
-# fractions <- list(a = 0.2, b = 0.1, c = 0.3, d = 0.4)
-# sample <- sampleByKey(pairs, FALSE, fractions, 1618L) # Key "d" will be ignored
-# fractions <- list(a = 0.2, b = 0.1)
-# sample <- sampleByKey(pairs, FALSE, fractions, 1618L) # KeyError: "c"
-#}
-# @rdname sampleByKey
-# @aliases sampleByKey,RDD-method
+#' Return a subset of this RDD sampled by key.
+#'
+#' @description
+#' \code{sampleByKey} Create a sample of this RDD using variable sampling rates
+#' for different keys as specified by fractions, a key to sampling rate map.
+#'
+#' @param x The RDD to sample elements by key, where each element is
+#' list(K, V) or c(K, V).
+#' @param withReplacement Sampling with replacement or not
+#' @param fraction The (rough) sample target fraction
+#' @param seed Randomness seed value
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:3000)
+#' pairs <- lapply(rdd, function(x) { if (x %% 3 == 0) list("a", x)
+#' else { if (x %% 3 == 1) list("b", x) else list("c", x) }})
+#' fractions <- list(a = 0.2, b = 0.1, c = 0.3)
+#' sample <- sampleByKey(pairs, FALSE, fractions, 1618L)
+#' 100 < length(lookup(sample, "a")) && 300 > length(lookup(sample, "a")) # TRUE
+#' 50 < length(lookup(sample, "b")) && 150 > length(lookup(sample, "b")) # TRUE
+#' 200 < length(lookup(sample, "c")) && 400 > length(lookup(sample, "c")) # TRUE
+#' lookup(sample, "a")[which.min(lookup(sample, "a"))] >= 0 # TRUE
+#' lookup(sample, "a")[which.max(lookup(sample, "a"))] <= 2000 # TRUE
+#' lookup(sample, "b")[which.min(lookup(sample, "b"))] >= 0 # TRUE
+#' lookup(sample, "b")[which.max(lookup(sample, "b"))] <= 2000 # TRUE
+#' lookup(sample, "c")[which.min(lookup(sample, "c"))] >= 0 # TRUE
+#' lookup(sample, "c")[which.max(lookup(sample, "c"))] <= 2000 # TRUE
+#' fractions <- list(a = 0.2, b = 0.1, c = 0.3, d = 0.4)
+#' sample <- sampleByKey(pairs, FALSE, fractions, 1618L) # Key "d" will be ignored
+#' fractions <- list(a = 0.2, b = 0.1)
+#' sample <- sampleByKey(pairs, FALSE, fractions, 1618L) # KeyError: "c"
+#'}
+#' @rdname sampleByKey
+#' @aliases sampleByKey,RDD-method
+#' @noRd
setMethod("sampleByKey",
signature(x = "RDD", withReplacement = "logical",
fractions = "vector", seed = "integer"),