aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhqzizania <qian.huang@intel.com>2015-05-08 11:25:04 -0700
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2015-05-08 11:25:04 -0700
commit008a60dd371e76819d8e08ab638cac7b3a48c9fc (patch)
tree88293a2d57c6bade201b7bdb3b1ebf0b13fdffcc
parent65afd3ce8b8a0b00f4ea8294eac14b72e964872d (diff)
downloadspark-008a60dd371e76819d8e08ab638cac7b3a48c9fc.tar.gz
spark-008a60dd371e76819d8e08ab638cac7b3a48c9fc.tar.bz2
spark-008a60dd371e76819d8e08ab638cac7b3a48c9fc.zip
[SPARK-6824] Fill the docs for DataFrame API in SparkR
This patch also removes the RDD docs from being built as a part of roxygen just by the method to delete " ' '" of " \#' ". Author: hqzizania <qian.huang@intel.com> Author: qhuang <qian.huang@intel.com> Closes #5969 from hqzizania/R1 and squashes the following commits: 6d27696 [qhuang] fixes in NAMESPACE eb4b095 [qhuang] remove more docs 6394579 [qhuang] remove RDD docs in generics.R 6813860 [hqzizania] Fill the docs for DataFrame API in SparkR 857220f [hqzizania] remove the pairRDD docs from being built as a part of roxygen c045d64 [hqzizania] remove the RDD docs from being built as a part of roxygen
-rw-r--r--R/pkg/DESCRIPTION2
-rw-r--r--R/pkg/NAMESPACE4
-rw-r--r--R/pkg/R/DataFrame.R95
-rw-r--r--R/pkg/R/RDD.R1546
-rw-r--r--R/pkg/R/SQLContext.R64
-rw-r--r--R/pkg/R/broadcast.R64
-rw-r--r--R/pkg/R/context.R240
-rw-r--r--R/pkg/R/generics.R318
-rw-r--r--R/pkg/R/pairRDD.R886
9 files changed, 1610 insertions, 1609 deletions
diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION
index 1c1779a763..efc85bbc4b 100644
--- a/R/pkg/DESCRIPTION
+++ b/R/pkg/DESCRIPTION
@@ -15,11 +15,11 @@ Suggests:
Description: R frontend for Spark
License: Apache License (== 2.0)
Collate:
+ 'schema.R'
'generics.R'
'jobj.R'
'RDD.R'
'pairRDD.R'
- 'schema.R'
'column.R'
'group.R'
'DataFrame.R'
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 3fb92be094..7611f479a6 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -26,7 +26,6 @@ exportMethods("cache",
"intersect",
"isLocal",
"join",
- "length",
"limit",
"orderBy",
"names",
@@ -101,9 +100,6 @@ export("cacheTable",
"tables",
"uncacheTable")
-export("sparkRSQL.init",
- "sparkRHive.init")
-
export("structField",
"structField.jobj",
"structField.character",
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 47d92f141c..354642e7bc 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -45,6 +45,9 @@ setMethod("initialize", "DataFrame", function(.Object, sdf, isCached) {
#' @rdname DataFrame
#' @export
+#'
+#' @param sdf A Java object reference to the backing Scala DataFrame
+#' @param isCached TRUE if the dataFrame is cached
dataFrame <- function(sdf, isCached = FALSE) {
new("DataFrame", sdf, isCached)
}
@@ -244,7 +247,7 @@ setMethod("columns",
})
#' @rdname columns
-#' @export
+#' @aliases names,DataFrame,function-method
setMethod("names",
signature(x = "DataFrame"),
function(x) {
@@ -399,23 +402,23 @@ setMethod("repartition",
dataFrame(sdf)
})
-#' toJSON
-#'
-#' Convert the rows of a DataFrame into JSON objects and return an RDD where
-#' each element contains a JSON string.
-#'
-#' @param x A SparkSQL DataFrame
-#' @return A StringRRDD of JSON objects
-#' @rdname tojson
-#' @export
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
-#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
-#' newRDD <- toJSON(df)
-#'}
+# toJSON
+#
+# Convert the rows of a DataFrame into JSON objects and return an RDD where
+# each element contains a JSON string.
+#
+#@param x A SparkSQL DataFrame
+# @return A StringRRDD of JSON objects
+# @rdname tojson
+# @export
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# sqlCtx <- sparkRSQL.init(sc)
+# path <- "path/to/file.json"
+# df <- jsonFile(sqlCtx, path)
+# newRDD <- toJSON(df)
+#}
setMethod("toJSON",
signature(x = "DataFrame"),
function(x) {
@@ -578,8 +581,8 @@ setMethod("limit",
dataFrame(res)
})
-# Take the first NUM rows of a DataFrame and return a the results as a data.frame
-
+#' Take the first NUM rows of a DataFrame and return a the results as a data.frame
+#'
#' @rdname take
#' @export
#' @examples
@@ -644,22 +647,22 @@ setMethod("first",
take(x, 1)
})
-#' toRDD()
-#'
-#' Converts a Spark DataFrame to an RDD while preserving column names.
-#'
-#' @param x A Spark DataFrame
-#'
-#' @rdname DataFrame
-#' @export
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
-#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
-#' rdd <- toRDD(df)
-#' }
+# toRDD()
+#
+# Converts a Spark DataFrame to an RDD while preserving column names.
+#
+# @param x A Spark DataFrame
+#
+# @rdname DataFrame
+# @export
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# sqlCtx <- sparkRSQL.init(sc)
+# path <- "path/to/file.json"
+# df <- jsonFile(sqlCtx, path)
+# rdd <- toRDD(df)
+# }
setMethod("toRDD",
signature(x = "DataFrame"),
function(x) {
@@ -706,6 +709,7 @@ setMethod("groupBy",
#'
#' Compute aggregates by specifying a list of columns
#'
+#' @param x a DataFrame
#' @rdname DataFrame
#' @export
setMethod("agg",
@@ -721,7 +725,7 @@ setMethod("agg",
# the requested map function. #
###################################################################################
-#' @rdname lapply
+# @rdname lapply
setMethod("lapply",
signature(X = "DataFrame", FUN = "function"),
function(X, FUN) {
@@ -729,14 +733,14 @@ setMethod("lapply",
lapply(rdd, FUN)
})
-#' @rdname lapply
+# @rdname lapply
setMethod("map",
signature(X = "DataFrame", FUN = "function"),
function(X, FUN) {
lapply(X, FUN)
})
-#' @rdname flatMap
+# @rdname flatMap
setMethod("flatMap",
signature(X = "DataFrame", FUN = "function"),
function(X, FUN) {
@@ -744,7 +748,7 @@ setMethod("flatMap",
flatMap(rdd, FUN)
})
-#' @rdname lapplyPartition
+# @rdname lapplyPartition
setMethod("lapplyPartition",
signature(X = "DataFrame", FUN = "function"),
function(X, FUN) {
@@ -752,14 +756,14 @@ setMethod("lapplyPartition",
lapplyPartition(rdd, FUN)
})
-#' @rdname lapplyPartition
+# @rdname lapplyPartition
setMethod("mapPartitions",
signature(X = "DataFrame", FUN = "function"),
function(X, FUN) {
lapplyPartition(X, FUN)
})
-#' @rdname foreach
+# @rdname foreach
setMethod("foreach",
signature(x = "DataFrame", func = "function"),
function(x, func) {
@@ -767,7 +771,7 @@ setMethod("foreach",
foreach(rdd, func)
})
-#' @rdname foreach
+# @rdname foreach
setMethod("foreachPartition",
signature(x = "DataFrame", func = "function"),
function(x, func) {
@@ -788,6 +792,7 @@ setMethod("$", signature(x = "DataFrame"),
getColumn(x, name)
})
+#' @rdname select
setMethod("$<-", signature(x = "DataFrame"),
function(x, name, value) {
stopifnot(class(value) == "Column" || is.null(value))
@@ -1009,7 +1014,7 @@ setMethod("sortDF",
})
#' @rdname sortDF
-#' @export
+#' @aliases orderBy,DataFrame,function-method
setMethod("orderBy",
signature(x = "DataFrame", col = "characterOrColumn"),
function(x, col) {
@@ -1046,7 +1051,7 @@ setMethod("filter",
})
#' @rdname filter
-#' @export
+#' @aliases where,DataFrame,function-method
setMethod("where",
signature(x = "DataFrame", condition = "characterOrColumn"),
function(x, condition) {
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R
index d1018c2361..73999a6737 100644
--- a/R/pkg/R/RDD.R
+++ b/R/pkg/R/RDD.R
@@ -19,16 +19,16 @@
setOldClass("jobj")
-#' @title S4 class that represents an RDD
-#' @description RDD can be created using functions like
-#' \code{parallelize}, \code{textFile} etc.
-#' @rdname RDD
-#' @seealso parallelize, textFile
-#'
-#' @slot env An R environment that stores bookkeeping states of the RDD
-#' @slot jrdd Java object reference to the backing JavaRDD
-#' to an RDD
-#' @export
+# @title S4 class that represents an RDD
+# @description RDD can be created using functions like
+# \code{parallelize}, \code{textFile} etc.
+# @rdname RDD
+# @seealso parallelize, textFile
+#
+# @slot env An R environment that stores bookkeeping states of the RDD
+# @slot jrdd Java object reference to the backing JavaRDD
+# to an RDD
+# @export
setClass("RDD",
slots = list(env = "environment",
jrdd = "jobj"))
@@ -108,14 +108,14 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
.Object
})
-#' @rdname RDD
-#' @export
-#'
-#' @param jrdd Java object reference to the backing JavaRDD
-#' @param serializedMode Use "byte" if the RDD stores data serialized in R, "string" if the RDD
-#' stores strings, and "row" if the RDD stores the rows of a DataFrame
-#' @param isCached TRUE if the RDD is cached
-#' @param isCheckpointed TRUE if the RDD has been checkpointed
+# @rdname RDD
+# @export
+#
+# @param jrdd Java object reference to the backing JavaRDD
+# @param serializedMode Use "byte" if the RDD stores data serialized in R, "string" if the RDD
+# stores strings, and "row" if the RDD stores the rows of a DataFrame
+# @param isCached TRUE if the RDD is cached
+# @param isCheckpointed TRUE if the RDD has been checkpointed
RDD <- function(jrdd, serializedMode = "byte", isCached = FALSE,
isCheckpointed = FALSE) {
new("RDD", jrdd, serializedMode, isCached, isCheckpointed)
@@ -200,19 +200,19 @@ setValidity("RDD",
############ Actions and Transformations ############
-#' Persist an RDD
-#'
-#' Persist this RDD with the default storage level (MEMORY_ONLY).
-#'
-#' @param x The RDD to cache
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, 1:10, 2L)
-#' cache(rdd)
-#'}
-#' @rdname cache-methods
-#' @aliases cache,RDD-method
+# Persist an RDD
+#
+# Persist this RDD with the default storage level (MEMORY_ONLY).
+#
+# @param x The RDD to cache
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, 1:10, 2L)
+# cache(rdd)
+#}
+# @rdname cache-methods
+# @aliases cache,RDD-method
setMethod("cache",
signature(x = "RDD"),
function(x) {
@@ -221,22 +221,22 @@ setMethod("cache",
x
})
-#' Persist an RDD
-#'
-#' Persist this RDD with the specified storage level. For details of the
-#' supported storage levels, refer to
-#' http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence.
-#'
-#' @param x The RDD to persist
-#' @param newLevel The new storage level to be assigned
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, 1:10, 2L)
-#' persist(rdd, "MEMORY_AND_DISK")
-#'}
-#' @rdname persist
-#' @aliases persist,RDD-method
+# Persist an RDD
+#
+# Persist this RDD with the specified storage level. For details of the
+# supported storage levels, refer to
+# http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence.
+#
+# @param x The RDD to persist
+# @param newLevel The new storage level to be assigned
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, 1:10, 2L)
+# persist(rdd, "MEMORY_AND_DISK")
+#}
+# @rdname persist
+# @aliases persist,RDD-method
setMethod("persist",
signature(x = "RDD", newLevel = "character"),
function(x, newLevel) {
@@ -245,21 +245,21 @@ setMethod("persist",
x
})
-#' Unpersist an RDD
-#'
-#' Mark the RDD as non-persistent, and remove all blocks for it from memory and
-#' disk.
-#'
-#' @param x The RDD to unpersist
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, 1:10, 2L)
-#' cache(rdd) # rdd@@env$isCached == TRUE
-#' unpersist(rdd) # rdd@@env$isCached == FALSE
-#'}
-#' @rdname unpersist-methods
-#' @aliases unpersist,RDD-method
+# Unpersist an RDD
+#
+# Mark the RDD as non-persistent, and remove all blocks for it from memory and
+# disk.
+#
+# @param x The RDD to unpersist
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, 1:10, 2L)
+# cache(rdd) # rdd@@env$isCached == TRUE
+# unpersist(rdd) # rdd@@env$isCached == FALSE
+#}
+# @rdname unpersist-methods
+# @aliases unpersist,RDD-method
setMethod("unpersist",
signature(x = "RDD"),
function(x) {
@@ -268,24 +268,24 @@ setMethod("unpersist",
x
})
-#' Checkpoint an RDD
-#'
-#' Mark this RDD for checkpointing. It will be saved to a file inside the
-#' checkpoint directory set with setCheckpointDir() and all references to its
-#' parent RDDs will be removed. This function must be called before any job has
-#' been executed on this RDD. It is strongly recommended that this RDD is
-#' persisted in memory, otherwise saving it on a file will require recomputation.
-#'
-#' @param x The RDD to checkpoint
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' setCheckpointDir(sc, "checkpoint")
-#' rdd <- parallelize(sc, 1:10, 2L)
-#' checkpoint(rdd)
-#'}
-#' @rdname checkpoint-methods
-#' @aliases checkpoint,RDD-method
+# Checkpoint an RDD
+#
+# Mark this RDD for checkpointing. It will be saved to a file inside the
+# checkpoint directory set with setCheckpointDir() and all references to its
+# parent RDDs will be removed. This function must be called before any job has
+# been executed on this RDD. It is strongly recommended that this RDD is
+# persisted in memory, otherwise saving it on a file will require recomputation.
+#
+# @param x The RDD to checkpoint
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# setCheckpointDir(sc, "checkpoint")
+# rdd <- parallelize(sc, 1:10, 2L)
+# checkpoint(rdd)
+#}
+# @rdname checkpoint-methods
+# @aliases checkpoint,RDD-method
setMethod("checkpoint",
signature(x = "RDD"),
function(x) {
@@ -295,18 +295,18 @@ setMethod("checkpoint",
x
})
-#' Gets the number of partitions of an RDD
-#'
-#' @param x A RDD.
-#' @return the number of partitions of rdd as an integer.
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, 1:10, 2L)
-#' numPartitions(rdd) # 2L
-#'}
-#' @rdname numPartitions
-#' @aliases numPartitions,RDD-method
+# Gets the number of partitions of an RDD
+#
+# @param x A RDD.
+# @return the number of partitions of rdd as an integer.
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, 1:10, 2L)
+# numPartitions(rdd) # 2L
+#}
+# @rdname numPartitions
+# @aliases numPartitions,RDD-method
setMethod("numPartitions",
signature(x = "RDD"),
function(x) {
@@ -315,24 +315,24 @@ setMethod("numPartitions",
callJMethod(partitions, "size")
})
-#' Collect elements of an RDD
-#'
-#' @description
-#' \code{collect} returns a list that contains all of the elements in this RDD.
-#'
-#' @param x The RDD to collect
-#' @param ... Other optional arguments to collect
-#' @param flatten FALSE if the list should not flattened
-#' @return a list containing elements in the RDD
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, 1:10, 2L)
-#' collect(rdd) # list from 1 to 10
-#' collectPartition(rdd, 0L) # list from 1 to 5
-#'}
-#' @rdname collect-methods
-#' @aliases collect,RDD-method
+# Collect elements of an RDD
+#
+# @description
+# \code{collect} returns a list that contains all of the elements in this RDD.
+#
+# @param x The RDD to collect
+# @param ... Other optional arguments to collect
+# @param flatten FALSE if the list should not flattened
+# @return a list containing elements in the RDD
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, 1:10, 2L)
+# collect(rdd) # list from 1 to 10
+# collectPartition(rdd, 0L) # list from 1 to 5
+#}
+# @rdname collect-methods
+# @aliases collect,RDD-method
setMethod("collect",
signature(x = "RDD"),
function(x, flatten = TRUE) {
@@ -343,12 +343,12 @@ setMethod("collect",
})
-#' @description
-#' \code{collectPartition} returns a list that contains all of the elements
-#' in the specified partition of the RDD.
-#' @param partitionId the partition to collect (starts from 0)
-#' @rdname collect-methods
-#' @aliases collectPartition,integer,RDD-method
+# @description
+# \code{collectPartition} returns a list that contains all of the elements
+# in the specified partition of the RDD.
+# @param partitionId the partition to collect (starts from 0)
+# @rdname collect-methods
+# @aliases collectPartition,integer,RDD-method
setMethod("collectPartition",
signature(x = "RDD", partitionId = "integer"),
function(x, partitionId) {
@@ -361,17 +361,17 @@ setMethod("collectPartition",
serializedMode = getSerializedMode(x))
})
-#' @description
-#' \code{collectAsMap} returns a named list as a map that contains all of the elements
-#' in a key-value pair RDD.
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, list(list(1, 2), list(3, 4)), 2L)
-#' collectAsMap(rdd) # list(`1` = 2, `3` = 4)
-#'}
-#' @rdname collect-methods
-#' @aliases collectAsMap,RDD-method
+# @description
+# \code{collectAsMap} returns a named list as a map that contains all of the elements
+# in a key-value pair RDD.
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, list(list(1, 2), list(3, 4)), 2L)
+# collectAsMap(rdd) # list(`1` = 2, `3` = 4)
+#}
+# @rdname collect-methods
+# @aliases collectAsMap,RDD-method
setMethod("collectAsMap",
signature(x = "RDD"),
function(x) {
@@ -381,19 +381,19 @@ setMethod("collectAsMap",
as.list(map)
})
-#' Return the number of elements in the RDD.
-#'
-#' @param x The RDD to count
-#' @return number of elements in the RDD.
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, 1:10)
-#' count(rdd) # 10
-#' length(rdd) # Same as count
-#'}
-#' @rdname count
-#' @aliases count,RDD-method
+# Return the number of elements in the RDD.
+#
+# @param x The RDD to count
+# @return number of elements in the RDD.
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, 1:10)
+# count(rdd) # 10
+# length(rdd) # Same as count
+#}
+# @rdname count
+# @aliases count,RDD-method
setMethod("count",
signature(x = "RDD"),
function(x) {
@@ -405,31 +405,31 @@ setMethod("count",
sum(as.integer(vals))
})
-#' Return the number of elements in the RDD
-#' @export
-#' @rdname count
+# Return the number of elements in the RDD
+# @export
+# @rdname count
setMethod("length",
signature(x = "RDD"),
function(x) {
count(x)
})
-#' Return the count of each unique value in this RDD as a list of
-#' (value, count) pairs.
-#'
-#' Same as countByValue in Spark.
-#'
-#' @param x The RDD to count
-#' @return list of (value, count) pairs, where count is number of each unique
-#' value in rdd.
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, c(1,2,3,2,1))
-#' countByValue(rdd) # (1,2L), (2,2L), (3,1L)
-#'}
-#' @rdname countByValue
-#' @aliases countByValue,RDD-method
+# Return the count of each unique value in this RDD as a list of
+# (value, count) pairs.
+#
+# Same as countByValue in Spark.
+#
+# @param x The RDD to count
+# @return list of (value, count) pairs, where count is number of each unique
+# value in rdd.
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, c(1,2,3,2,1))
+# countByValue(rdd) # (1,2L), (2,2L), (3,1L)
+#}
+# @rdname countByValue
+# @aliases countByValue,RDD-method
setMethod("countByValue",
signature(x = "RDD"),
function(x) {
@@ -437,23 +437,23 @@ setMethod("countByValue",
collect(reduceByKey(ones, `+`, numPartitions(x)))
})
-#' Apply a function to all elements
-#'
-#' This function creates a new RDD by applying the given transformation to all
-#' elements of the given RDD
-#'
-#' @param X The RDD to apply the transformation.
-#' @param FUN the transformation to apply on each element
-#' @return a new RDD created by the transformation.
-#' @rdname lapply
-#' @aliases lapply
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, 1:10)
-#' multiplyByTwo <- lapply(rdd, function(x) { x * 2 })
-#' collect(multiplyByTwo) # 2,4,6...
-#'}
+# Apply a function to all elements
+#
+# This function creates a new RDD by applying the given transformation to all
+# elements of the given RDD
+#
+# @param X The RDD to apply the transformation.
+# @param FUN the transformation to apply on each element
+# @return a new RDD created by the transformation.
+# @rdname lapply
+# @aliases lapply
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, 1:10)
+# multiplyByTwo <- lapply(rdd, function(x) { x * 2 })
+# collect(multiplyByTwo) # 2,4,6...
+#}
setMethod("lapply",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
@@ -463,31 +463,31 @@ setMethod("lapply",
lapplyPartitionsWithIndex(X, func)
})
-#' @rdname lapply
-#' @aliases map,RDD,function-method
+# @rdname lapply
+# @aliases map,RDD,function-method
setMethod("map",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
lapply(X, FUN)
})
-#' Flatten results after apply a function to all elements
-#'
-#' This function return a new RDD by first applying a function to all
-#' elements of this RDD, and then flattening the results.
-#'
-#' @param X The RDD to apply the transformation.
-#' @param FUN the transformation to apply on each element
-#' @return a new RDD created by the transformation.
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, 1:10)
-#' multiplyByTwo <- flatMap(rdd, function(x) { list(x*2, x*10) })
-#' collect(multiplyByTwo) # 2,20,4,40,6,60...
-#'}
-#' @rdname flatMap
-#' @aliases flatMap,RDD,function-method
+# Flatten results after apply a function to all elements
+#
+# This function return a new RDD by first applying a function to all
+# elements of this RDD, and then flattening the results.
+#
+# @param X The RDD to apply the transformation.
+# @param FUN the transformation to apply on each element
+# @return a new RDD created by the transformation.
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, 1:10)
+# multiplyByTwo <- flatMap(rdd, function(x) { list(x*2, x*10) })
+# collect(multiplyByTwo) # 2,20,4,40,6,60...
+#}
+# @rdname flatMap
+# @aliases flatMap,RDD,function-method
setMethod("flatMap",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
@@ -500,83 +500,83 @@ setMethod("flatMap",
lapplyPartition(X, partitionFunc)
})
-#' Apply a function to each partition of an RDD
-#'
-#' Return a new RDD by applying a function to each partition of this RDD.
-#'
-#' @param X The RDD to apply the transformation.
-#' @param FUN the transformation to apply on each partition.
-#' @return a new RDD created by the transformation.
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, 1:10)
-#' partitionSum <- lapplyPartition(rdd, function(part) { Reduce("+", part) })
-#' collect(partitionSum) # 15, 40
-#'}
-#' @rdname lapplyPartition
-#' @aliases lapplyPartition,RDD,function-method
+# Apply a function to each partition of an RDD
+#
+# Return a new RDD by applying a function to each partition of this RDD.
+#
+# @param X The RDD to apply the transformation.
+# @param FUN the transformation to apply on each partition.
+# @return a new RDD created by the transformation.
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, 1:10)
+# partitionSum <- lapplyPartition(rdd, function(part) { Reduce("+", part) })
+# collect(partitionSum) # 15, 40
+#}
+# @rdname lapplyPartition
+# @aliases lapplyPartition,RDD,function-method
setMethod("lapplyPartition",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
lapplyPartitionsWithIndex(X, function(s, part) { FUN(part) })
})
-#' mapPartitions is the same as lapplyPartition.
-#'
-#' @rdname lapplyPartition
-#' @aliases mapPartitions,RDD,function-method
+# mapPartitions is the same as lapplyPartition.
+#
+# @rdname lapplyPartition
+# @aliases mapPartitions,RDD,function-method
setMethod("mapPartitions",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
lapplyPartition(X, FUN)
})
-#' Return a new RDD by applying a function to each partition of this RDD, while
-#' tracking the index of the original partition.
-#'
-#' @param X The RDD to apply the transformation.
-#' @param FUN the transformation to apply on each partition; takes the partition
-#' index and a list of elements in the particular partition.
-#' @return a new RDD created by the transformation.
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, 1:10, 5L)
-#' prod <- lapplyPartitionsWithIndex(rdd, function(partIndex, part) {
-#' partIndex * Reduce("+", part) })
-#' collect(prod, flatten = FALSE) # 0, 7, 22, 45, 76
-#'}
-#' @rdname lapplyPartitionsWithIndex
-#' @aliases lapplyPartitionsWithIndex,RDD,function-method
+# Return a new RDD by applying a function to each partition of this RDD, while
+# tracking the index of the original partition.
+#
+# @param X The RDD to apply the transformation.
+# @param FUN the transformation to apply on each partition; takes the partition
+# index and a list of elements in the particular partition.
+# @return a new RDD created by the transformation.
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, 1:10, 5L)
+# prod <- lapplyPartitionsWithIndex(rdd, function(partIndex, part) {
+# partIndex * Reduce("+", part) })
+# collect(prod, flatten = FALSE) # 0, 7, 22, 45, 76
+#}
+# @rdname lapplyPartitionsWithIndex
+# @aliases lapplyPartitionsWithIndex,RDD,function-method
setMethod("lapplyPartitionsWithIndex",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
PipelinedRDD(X, FUN)
})
-#' @rdname lapplyPartitionsWithIndex
-#' @aliases mapPartitionsWithIndex,RDD,function-method
+# @rdname lapplyPartitionsWithIndex
+# @aliases mapPartitionsWithIndex,RDD,function-method
setMethod("mapPartitionsWithIndex",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
lapplyPartitionsWithIndex(X, FUN)
})
-#' This function returns a new RDD containing only the elements that satisfy
-#' a predicate (i.e. returning TRUE in a given logical function).
-#' The same as `filter()' in Spark.
-#'
-#' @param x The RDD to be filtered.
-#' @param f A unary predicate function.
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, 1:10)
-#' unlist(collect(filterRDD(rdd, function (x) { x < 3 }))) # c(1, 2)
-#'}
-#' @rdname filterRDD
-#' @aliases filterRDD,RDD,function-method
+# This function returns a new RDD containing only the elements that satisfy
+# a predicate (i.e. returning TRUE in a given logical function).
+# The same as `filter()' in Spark.
+#
+# @param x The RDD to be filtered.
+# @param f A unary predicate function.
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, 1:10)
+# unlist(collect(filterRDD(rdd, function (x) { x < 3 }))) # c(1, 2)
+#}
+# @rdname filterRDD
+# @aliases filterRDD,RDD,function-method
setMethod("filterRDD",
signature(x = "RDD", f = "function"),
function(x, f) {
@@ -586,30 +586,30 @@ setMethod("filterRDD",
lapplyPartition(x, filter.func)
})
-#' @rdname filterRDD
-#' @aliases Filter
+# @rdname filterRDD
+# @aliases Filter
setMethod("Filter",
signature(f = "function", x = "RDD"),
function(f, x) {
filterRDD(x, f)
})
-#' Reduce across elements of an RDD.
-#'
-#' This function reduces the elements of this RDD using the
-#' specified commutative and associative binary operator.
-#'
-#' @param x The RDD to reduce
-#' @param func Commutative and associative function to apply on elements
-#' of the RDD.
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, 1:10)
-#' reduce(rdd, "+") # 55
-#'}
-#' @rdname reduce
-#' @aliases reduce,RDD,ANY-method
+# Reduce across elements of an RDD.
+#
+# This function reduces the elements of this RDD using the
+# specified commutative and associative binary operator.
+#
+# @param x The RDD to reduce
+# @param func Commutative and associative function to apply on elements
+# of the RDD.
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, 1:10)
+# reduce(rdd, "+") # 55
+#}
+# @rdname reduce
+# @aliases reduce,RDD,ANY-method
setMethod("reduce",
signature(x = "RDD", func = "ANY"),
function(x, func) {
@@ -623,70 +623,70 @@ setMethod("reduce",
Reduce(func, partitionList)
})
-#' Get the maximum element of an RDD.
-#'
-#' @param x The RDD to get the maximum element from
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, 1:10)
-#' maximum(rdd) # 10
-#'}
-#' @rdname maximum
-#' @aliases maximum,RDD
+# Get the maximum element of an RDD.
+#
+# @param x The RDD to get the maximum element from
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, 1:10)
+# maximum(rdd) # 10
+#}
+# @rdname maximum
+# @aliases maximum,RDD
setMethod("maximum",
signature(x = "RDD"),
function(x) {
reduce(x, max)
})
-#' Get the minimum element of an RDD.
-#'
-#' @param x The RDD to get the minimum element from
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, 1:10)
-#' minimum(rdd) # 1
-#'}
-#' @rdname minimum
-#' @aliases minimum,RDD
+# Get the minimum element of an RDD.
+#
+# @param x The RDD to get the minimum element from
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, 1:10)
+# minimum(rdd) # 1
+#}
+# @rdname minimum
+# @aliases minimum,RDD
setMethod("minimum",
signature(x = "RDD"),
function(x) {
reduce(x, min)
})
-#' Add up the elements in an RDD.
-#'
-#' @param x The RDD to add up the elements in
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, 1:10)
-#' sumRDD(rdd) # 55
-#'}
-#' @rdname sumRDD
-#' @aliases sumRDD,RDD
+# Add up the elements in an RDD.
+#
+# @param x The RDD to add up the elements in
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, 1:10)
+# sumRDD(rdd) # 55
+#}
+# @rdname sumRDD
+# @aliases sumRDD,RDD
setMethod("sumRDD",
signature(x = "RDD"),
function(x) {
reduce(x, "+")
})
-#' Applies a function to all elements in an RDD, and force evaluation.
-#'
-#' @param x The RDD to apply the function
-#' @param func The function to be applied.
-#' @return invisible NULL.
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, 1:10)
-#' foreach(rdd, function(x) { save(x, file=...) })
-#'}
-#' @rdname foreach
-#' @aliases foreach,RDD,function-method
+# Applies a function to all elements in an RDD, and force evaluation.
+#
+# @param x The RDD to apply the function
+# @param func The function to be applied.
+# @return invisible NULL.
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, 1:10)
+# foreach(rdd, function(x) { save(x, file=...) })
+#}
+# @rdname foreach
+# @aliases foreach,RDD,function-method
setMethod("foreach",
signature(x = "RDD", func = "function"),
function(x, func) {
@@ -697,37 +697,37 @@ setMethod("foreach",
invisible(collect(mapPartitions(x, partition.func)))
})
-#' Applies a function to each partition in an RDD, and force evaluation.
-#'
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, 1:10)
-#' foreachPartition(rdd, function(part) { save(part, file=...); NULL })
-#'}
-#' @rdname foreach
-#' @aliases foreachPartition,RDD,function-method
+# Applies a function to each partition in an RDD, and force evaluation.
+#
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, 1:10)
+# foreachPartition(rdd, function(part) { save(part, file=...); NULL })
+#}
+# @rdname foreach
+# @aliases foreachPartition,RDD,function-method
setMethod("foreachPartition",
signature(x = "RDD", func = "function"),
function(x, func) {
invisible(collect(mapPartitions(x, func)))
})
-#' Take elements from an RDD.
-#'
-#' This function takes the first NUM elements in the RDD and
-#' returns them in a list.
-#'
-#' @param x The RDD to take elements from
-#' @param num Number of elements to take
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, 1:10)
-#' take(rdd, 2L) # list(1, 2)
-#'}
-#' @rdname take
-#' @aliases take,RDD,numeric-method
+# Take elements from an RDD.
+#
+# This function takes the first NUM elements in the RDD and
+# returns them in a list.
+#
+# @param x The RDD to take elements from
+# @param num Number of elements to take
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, 1:10)
+# take(rdd, 2L) # list(1, 2)
+#}
+# @rdname take
+# @aliases take,RDD,numeric-method
setMethod("take",
signature(x = "RDD", num = "numeric"),
function(x, num) {
@@ -762,39 +762,39 @@ setMethod("take",
})
-#' First
-#'
-#' Return the first element of an RDD
-#'
-#' @rdname first
-#' @export
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, 1:10)
-#' first(rdd)
-#' }
+# First
+#
+# Return the first element of an RDD
+#
+# @rdname first
+# @export
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, 1:10)
+# first(rdd)
+# }
setMethod("first",
signature(x = "RDD"),
function(x) {
take(x, 1)[[1]]
})
-#' Removes the duplicates from RDD.
-#'
-#' This function returns a new RDD containing the distinct elements in the
-#' given RDD. The same as `distinct()' in Spark.
-#'
-#' @param x The RDD to remove duplicates from.
-#' @param numPartitions Number of partitions to create.
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, c(1,2,2,3,3,3))
-#' sort(unlist(collect(distinct(rdd)))) # c(1, 2, 3)
-#'}
-#' @rdname distinct
-#' @aliases distinct,RDD-method
+# Removes the duplicates from RDD.
+#
+# This function returns a new RDD containing the distinct elements in the
+# given RDD. The same as `distinct()' in Spark.
+#
+# @param x The RDD to remove duplicates from.
+# @param numPartitions Number of partitions to create.
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, c(1,2,2,3,3,3))
+# sort(unlist(collect(distinct(rdd)))) # c(1, 2, 3)
+#}
+# @rdname distinct
+# @aliases distinct,RDD-method
setMethod("distinct",
signature(x = "RDD"),
function(x, numPartitions = SparkR:::numPartitions(x)) {
@@ -806,24 +806,24 @@ setMethod("distinct",
resRDD
})
-#' Return an RDD that is a sampled subset of the given RDD.
-#'
-#' The same as `sample()' in Spark. (We rename it due to signature
-#' inconsistencies with the `sample()' function in R's base package.)
-#'
-#' @param x The RDD to sample elements from
-#' @param withReplacement Sampling with replacement or not
-#' @param fraction The (rough) sample target fraction
-#' @param seed Randomness seed value
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, 1:10)
-#' collect(sampleRDD(rdd, FALSE, 0.5, 1618L)) # ~5 distinct elements
-#' collect(sampleRDD(rdd, TRUE, 0.5, 9L)) # ~5 elements possibly with duplicates
-#'}
-#' @rdname sampleRDD
-#' @aliases sampleRDD,RDD
+# Return an RDD that is a sampled subset of the given RDD.
+#
+# The same as `sample()' in Spark. (We rename it due to signature
+# inconsistencies with the `sample()' function in R's base package.)
+#
+# @param x The RDD to sample elements from
+# @param withReplacement Sampling with replacement or not
+# @param fraction The (rough) sample target fraction
+# @param seed Randomness seed value
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, 1:10)
+# collect(sampleRDD(rdd, FALSE, 0.5, 1618L)) # ~5 distinct elements
+# collect(sampleRDD(rdd, TRUE, 0.5, 9L)) # ~5 elements possibly with duplicates
+#}
+# @rdname sampleRDD
+# @aliases sampleRDD,RDD
setMethod("sampleRDD",
signature(x = "RDD", withReplacement = "logical",
fraction = "numeric", seed = "integer"),
@@ -867,23 +867,23 @@ setMethod("sampleRDD",
lapplyPartitionsWithIndex(x, samplingFunc)
})
-#' Return a list of the elements that are a sampled subset of the given RDD.
-#'
-#' @param x The RDD to sample elements from
-#' @param withReplacement Sampling with replacement or not
-#' @param num Number of elements to return
-#' @param seed Randomness seed value
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, 1:100)
-#' # exactly 5 elements sampled, which may not be distinct
-#' takeSample(rdd, TRUE, 5L, 1618L)
-#' # exactly 5 distinct elements sampled
-#' takeSample(rdd, FALSE, 5L, 16181618L)
-#'}
-#' @rdname takeSample
-#' @aliases takeSample,RDD
+# Return a list of the elements that are a sampled subset of the given RDD.
+#
+# @param x The RDD to sample elements from
+# @param withReplacement Sampling with replacement or not
+# @param num Number of elements to return
+# @param seed Randomness seed value
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, 1:100)
+# # exactly 5 elements sampled, which may not be distinct
+# takeSample(rdd, TRUE, 5L, 1618L)
+# # exactly 5 distinct elements sampled
+# takeSample(rdd, FALSE, 5L, 16181618L)
+#}
+# @rdname takeSample
+# @aliases takeSample,RDD
setMethod("takeSample", signature(x = "RDD", withReplacement = "logical",
num = "integer", seed = "integer"),
function(x, withReplacement, num, seed) {
@@ -930,18 +930,18 @@ setMethod("takeSample", signature(x = "RDD", withReplacement = "logical",
sample(samples)[1:total]
})
-#' Creates tuples of the elements in this RDD by applying a function.
-#'
-#' @param x The RDD.
-#' @param func The function to be applied.
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, list(1, 2, 3))
-#' collect(keyBy(rdd, function(x) { x*x })) # list(list(1, 1), list(4, 2), list(9, 3))
-#'}
-#' @rdname keyBy
-#' @aliases keyBy,RDD
+# Creates tuples of the elements in this RDD by applying a function.
+#
+# @param x The RDD.
+# @param func The function to be applied.
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, list(1, 2, 3))
+# collect(keyBy(rdd, function(x) { x*x })) # list(list(1, 1), list(4, 2), list(9, 3))
+#}
+# @rdname keyBy
+# @aliases keyBy,RDD
setMethod("keyBy",
signature(x = "RDD", func = "function"),
function(x, func) {
@@ -951,44 +951,44 @@ setMethod("keyBy",
lapply(x, apply.func)
})
-#' Return a new RDD that has exactly numPartitions partitions.
-#' Can increase or decrease the level of parallelism in this RDD. Internally,
-#' this uses a shuffle to redistribute data.
-#' If you are decreasing the number of partitions in this RDD, consider using
-#' coalesce, which can avoid performing a shuffle.
-#'
-#' @param x The RDD.
-#' @param numPartitions Number of partitions to create.
-#' @seealso coalesce
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, list(1, 2, 3, 4, 5, 6, 7), 4L)
-#' numPartitions(rdd) # 4
-#' numPartitions(repartition(rdd, 2L)) # 2
-#'}
-#' @rdname repartition
-#' @aliases repartition,RDD
+# Return a new RDD that has exactly numPartitions partitions.
+# Can increase or decrease the level of parallelism in this RDD. Internally,
+# this uses a shuffle to redistribute data.
+# If you are decreasing the number of partitions in this RDD, consider using
+# coalesce, which can avoid performing a shuffle.
+#
+# @param x The RDD.
+# @param numPartitions Number of partitions to create.
+# @seealso coalesce
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, list(1, 2, 3, 4, 5, 6, 7), 4L)
+# numPartitions(rdd) # 4
+# numPartitions(repartition(rdd, 2L)) # 2
+#}
+# @rdname repartition
+# @aliases repartition,RDD
setMethod("repartition",
signature(x = "RDD", numPartitions = "numeric"),
function(x, numPartitions) {
coalesce(x, numPartitions, TRUE)
})
-#' Return a new RDD that is reduced into numPartitions partitions.
-#'
-#' @param x The RDD.
-#' @param numPartitions Number of partitions to create.
-#' @seealso repartition
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, list(1, 2, 3, 4, 5), 3L)
-#' numPartitions(rdd) # 3
-#' numPartitions(coalesce(rdd, 1L)) # 1
-#'}
-#' @rdname coalesce
-#' @aliases coalesce,RDD
+# Return a new RDD that is reduced into numPartitions partitions.
+#
+# @param x The RDD.
+# @param numPartitions Number of partitions to create.
+# @seealso repartition
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, list(1, 2, 3, 4, 5), 3L)
+# numPartitions(rdd) # 3
+# numPartitions(coalesce(rdd, 1L)) # 1
+#}
+# @rdname coalesce
+# @aliases coalesce,RDD
setMethod("coalesce",
signature(x = "RDD", numPartitions = "numeric"),
function(x, numPartitions, shuffle = FALSE) {
@@ -1012,19 +1012,19 @@ setMethod("coalesce",
}
})
-#' Save this RDD as a SequenceFile of serialized objects.
-#'
-#' @param x The RDD to save
-#' @param path The directory where the file is saved
-#' @seealso objectFile
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, 1:3)
-#' saveAsObjectFile(rdd, "/tmp/sparkR-tmp")
-#'}
-#' @rdname saveAsObjectFile
-#' @aliases saveAsObjectFile,RDD
+# Save this RDD as a SequenceFile of serialized objects.
+#
+# @param x The RDD to save
+# @param path The directory where the file is saved
+# @seealso objectFile
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, 1:3)
+# saveAsObjectFile(rdd, "/tmp/sparkR-tmp")
+#}
+# @rdname saveAsObjectFile
+# @aliases saveAsObjectFile,RDD
setMethod("saveAsObjectFile",
signature(x = "RDD", path = "character"),
function(x, path) {
@@ -1037,18 +1037,18 @@ setMethod("saveAsObjectFile",
invisible(callJMethod(getJRDD(x), "saveAsObjectFile", path))
})
-#' Save this RDD as a text file, using string representations of elements.
-#'
-#' @param x The RDD to save
-#' @param path The directory where the partitions of the text file are saved
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, 1:3)
-#' saveAsTextFile(rdd, "/tmp/sparkR-tmp")
-#'}
-#' @rdname saveAsTextFile
-#' @aliases saveAsTextFile,RDD
+# Save this RDD as a text file, using string representations of elements.
+#
+# @param x The RDD to save
+# @param path The directory where the partitions of the text file are saved
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, 1:3)
+# saveAsTextFile(rdd, "/tmp/sparkR-tmp")
+#}
+# @rdname saveAsTextFile
+# @aliases saveAsTextFile,RDD
setMethod("saveAsTextFile",
signature(x = "RDD", path = "character"),
function(x, path) {
@@ -1061,21 +1061,21 @@ setMethod("saveAsTextFile",
callJMethod(getJRDD(stringRdd, serializedMode = "string"), "saveAsTextFile", path))
})
-#' Sort an RDD by the given key function.
-#'
-#' @param x An RDD to be sorted.
-#' @param func A function used to compute the sort key for each element.
-#' @param ascending A flag to indicate whether the sorting is ascending or descending.
-#' @param numPartitions Number of partitions to create.
-#' @return An RDD where all elements are sorted.
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, list(3, 2, 1))
-#' collect(sortBy(rdd, function(x) { x })) # list (1, 2, 3)
-#'}
-#' @rdname sortBy
-#' @aliases sortBy,RDD,RDD-method
+# Sort an RDD by the given key function.
+#
+# @param x An RDD to be sorted.
+# @param func A function used to compute the sort key for each element.
+# @param ascending A flag to indicate whether the sorting is ascending or descending.
+# @param numPartitions Number of partitions to create.
+# @return An RDD where all elements are sorted.
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, list(3, 2, 1))
+# collect(sortBy(rdd, function(x) { x })) # list (1, 2, 3)
+#}
+# @rdname sortBy
+# @aliases sortBy,RDD,RDD-method
setMethod("sortBy",
signature(x = "RDD", func = "function"),
function(x, func, ascending = TRUE, numPartitions = SparkR:::numPartitions(x)) {
@@ -1137,97 +1137,97 @@ takeOrderedElem <- function(x, num, ascending = TRUE) {
resList
}
-#' Returns the first N elements from an RDD in ascending order.
-#'
-#' @param x An RDD.
-#' @param num Number of elements to return.
-#' @return The first N elements from the RDD in ascending order.
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
-#' takeOrdered(rdd, 6L) # list(1, 2, 3, 4, 5, 6)
-#'}
-#' @rdname takeOrdered
-#' @aliases takeOrdered,RDD,RDD-method
+# Returns the first N elements from an RDD in ascending order.
+#
+# @param x An RDD.
+# @param num Number of elements to return.
+# @return The first N elements from the RDD in ascending order.
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
+# takeOrdered(rdd, 6L) # list(1, 2, 3, 4, 5, 6)
+#}
+# @rdname takeOrdered
+# @aliases takeOrdered,RDD,RDD-method
setMethod("takeOrdered",
signature(x = "RDD", num = "integer"),
function(x, num) {
takeOrderedElem(x, num)
})
-#' Returns the top N elements from an RDD.
-#'
-#' @param x An RDD.
-#' @param num Number of elements to return.
-#' @return The top N elements from the RDD.
-#' @rdname top
-#' @export
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
-#' top(rdd, 6L) # list(10, 9, 7, 6, 5, 4)
-#'}
-#' @rdname top
-#' @aliases top,RDD,RDD-method
+# Returns the top N elements from an RDD.
+#
+# @param x An RDD.
+# @param num Number of elements to return.
+# @return The top N elements from the RDD.
+# @rdname top
+# @export
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
+# top(rdd, 6L) # list(10, 9, 7, 6, 5, 4)
+#}
+# @rdname top
+# @aliases top,RDD,RDD-method
setMethod("top",
signature(x = "RDD", num = "integer"),
function(x, num) {
takeOrderedElem(x, num, FALSE)
})
-#' Fold an RDD using a given associative function and a neutral "zero value".
-#'
-#' Aggregate the elements of each partition, and then the results for all the
-#' partitions, using a given associative function and a neutral "zero value".
-#'
-#' @param x An RDD.
-#' @param zeroValue A neutral "zero value".
-#' @param op An associative function for the folding operation.
-#' @return The folding result.
-#' @rdname fold
-#' @seealso reduce
-#' @export
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, list(1, 2, 3, 4, 5))
-#' fold(rdd, 0, "+") # 15
-#'}
-#' @rdname fold
-#' @aliases fold,RDD,RDD-method
+# Fold an RDD using a given associative function and a neutral "zero value".
+#
+# Aggregate the elements of each partition, and then the results for all the
+# partitions, using a given associative function and a neutral "zero value".
+#
+# @param x An RDD.
+# @param zeroValue A neutral "zero value".
+# @param op An associative function for the folding operation.
+# @return The folding result.
+# @rdname fold
+# @seealso reduce
+# @export
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, list(1, 2, 3, 4, 5))
+# fold(rdd, 0, "+") # 15
+#}
+# @rdname fold
+# @aliases fold,RDD,RDD-method
setMethod("fold",
signature(x = "RDD", zeroValue = "ANY", op = "ANY"),
function(x, zeroValue, op) {
aggregateRDD(x, zeroValue, op, op)
})
-#' Aggregate an RDD using the given combine functions and a neutral "zero value".
-#'
-#' Aggregate the elements of each partition, and then the results for all the
-#' partitions, using given combine functions and a neutral "zero value".
-#'
-#' @param x An RDD.
-#' @param zeroValue A neutral "zero value".
-#' @param seqOp A function to aggregate the RDD elements. It may return a different
-#' result type from the type of the RDD elements.
-#' @param combOp A function to aggregate results of seqOp.
-#' @return The aggregation result.
-#' @rdname aggregateRDD
-#' @seealso reduce
-#' @export
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, list(1, 2, 3, 4))
-#' zeroValue <- list(0, 0)
-#' seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
-#' combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
-#' aggregateRDD(rdd, zeroValue, seqOp, combOp) # list(10, 4)
-#'}
-#' @rdname aggregateRDD
-#' @aliases aggregateRDD,RDD,RDD-method
+# Aggregate an RDD using the given combine functions and a neutral "zero value".
+#
+# Aggregate the elements of each partition, and then the results for all the
+# partitions, using given combine functions and a neutral "zero value".
+#
+# @param x An RDD.
+# @param zeroValue A neutral "zero value".
+# @param seqOp A function to aggregate the RDD elements. It may return a different
+# result type from the type of the RDD elements.
+# @param combOp A function to aggregate results of seqOp.
+# @return The aggregation result.
+# @rdname aggregateRDD
+# @seealso reduce
+# @export
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, list(1, 2, 3, 4))
+# zeroValue <- list(0, 0)
+# seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
+# combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
+# aggregateRDD(rdd, zeroValue, seqOp, combOp) # list(10, 4)
+#}
+# @rdname aggregateRDD
+# @aliases aggregateRDD,RDD,RDD-method
setMethod("aggregateRDD",
signature(x = "RDD", zeroValue = "ANY", seqOp = "ANY", combOp = "ANY"),
function(x, zeroValue, seqOp, combOp) {
@@ -1240,25 +1240,25 @@ setMethod("aggregateRDD",
Reduce(combOp, partitionList, zeroValue)
})
-#' Pipes elements to a forked external process.
-#'
-#' The same as 'pipe()' in Spark.
-#'
-#' @param x The RDD whose elements are piped to the forked external process.
-#' @param command The command to fork an external process.
-#' @param env A named list to set environment variables of the external process.
-#' @return A new RDD created by piping all elements to a forked external process.
-#' @rdname pipeRDD
-#' @export
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, 1:10)
-#' collect(pipeRDD(rdd, "more")
-#' Output: c("1", "2", ..., "10")
-#'}
-#' @rdname pipeRDD
-#' @aliases pipeRDD,RDD,character-method
+# Pipes elements to a forked external process.
+#
+# The same as 'pipe()' in Spark.
+#
+# @param x The RDD whose elements are piped to the forked external process.
+# @param command The command to fork an external process.
+# @param env A named list to set environment variables of the external process.
+# @return A new RDD created by piping all elements to a forked external process.
+# @rdname pipeRDD
+# @export
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, 1:10)
+# collect(pipeRDD(rdd, "more")
+# Output: c("1", "2", ..., "10")
+#}
+# @rdname pipeRDD
+# @aliases pipeRDD,RDD,character-method
setMethod("pipeRDD",
signature(x = "RDD", command = "character"),
function(x, command, env = list()) {
@@ -1274,41 +1274,41 @@ setMethod("pipeRDD",
})
# TODO: Consider caching the name in the RDD's environment
-#' Return an RDD's name.
-#'
-#' @param x The RDD whose name is returned.
-#' @rdname name
-#' @export
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, list(1,2,3))
-#' name(rdd) # NULL (if not set before)
-#'}
-#' @rdname name
-#' @aliases name,RDD
+# Return an RDD's name.
+#
+# @param x The RDD whose name is returned.
+# @rdname name
+# @export
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, list(1,2,3))
+# name(rdd) # NULL (if not set before)
+#}
+# @rdname name
+# @aliases name,RDD
setMethod("name",
signature(x = "RDD"),
function(x) {
callJMethod(getJRDD(x), "name")
})
-#' Set an RDD's name.
-#'
-#' @param x The RDD whose name is to be set.
-#' @param name The RDD name to be set.
-#' @return a new RDD renamed.
-#' @rdname setName
-#' @export
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, list(1,2,3))
-#' setName(rdd, "myRDD")
-#' name(rdd) # "myRDD"
-#'}
-#' @rdname setName
-#' @aliases setName,RDD
+# Set an RDD's name.
+#
+# @param x The RDD whose name is to be set.
+# @param name The RDD name to be set.
+# @return a new RDD renamed.
+# @rdname setName
+# @export
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, list(1,2,3))
+# setName(rdd, "myRDD")
+# name(rdd) # "myRDD"
+#}
+# @rdname setName
+# @aliases setName,RDD
setMethod("setName",
signature(x = "RDD", name = "character"),
function(x, name) {
@@ -1316,25 +1316,25 @@ setMethod("setName",
x
})
-#' Zip an RDD with generated unique Long IDs.
-#'
-#' Items in the kth partition will get ids k, n+k, 2*n+k, ..., where
-#' n is the number of partitions. So there may exist gaps, but this
-#' method won't trigger a spark job, which is different from
-#' zipWithIndex.
-#'
-#' @param x An RDD to be zipped.
-#' @return An RDD with zipped items.
-#' @seealso zipWithIndex
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
-#' collect(zipWithUniqueId(rdd))
-#' # list(list("a", 0), list("b", 3), list("c", 1), list("d", 4), list("e", 2))
-#'}
-#' @rdname zipWithUniqueId
-#' @aliases zipWithUniqueId,RDD
+# Zip an RDD with generated unique Long IDs.
+#
+# Items in the kth partition will get ids k, n+k, 2*n+k, ..., where
+# n is the number of partitions. So there may exist gaps, but this
+# method won't trigger a spark job, which is different from
+# zipWithIndex.
+#
+# @param x An RDD to be zipped.
+# @return An RDD with zipped items.
+# @seealso zipWithIndex
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
+# collect(zipWithUniqueId(rdd))
+# # list(list("a", 0), list("b", 3), list("c", 1), list("d", 4), list("e", 2))
+#}
+# @rdname zipWithUniqueId
+# @aliases zipWithUniqueId,RDD
setMethod("zipWithUniqueId",
signature(x = "RDD"),
function(x) {
@@ -1353,28 +1353,28 @@ setMethod("zipWithUniqueId",
lapplyPartitionsWithIndex(x, partitionFunc)
})
-#' Zip an RDD with its element indices.
-#'
-#' The ordering is first based on the partition index and then the
-#' ordering of items within each partition. So the first item in
-#' the first partition gets index 0, and the last item in the last
-#' partition receives the largest index.
-#'
-#' This method needs to trigger a Spark job when this RDD contains
-#' more than one partition.
-#'
-#' @param x An RDD to be zipped.
-#' @return An RDD with zipped items.
-#' @seealso zipWithUniqueId
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
-#' collect(zipWithIndex(rdd))
-#' # list(list("a", 0), list("b", 1), list("c", 2), list("d", 3), list("e", 4))
-#'}
-#' @rdname zipWithIndex
-#' @aliases zipWithIndex,RDD
+# Zip an RDD with its element indices.
+#
+# The ordering is first based on the partition index and then the
+# ordering of items within each partition. So the first item in
+# the first partition gets index 0, and the last item in the last
+# partition receives the largest index.
+#
+# This method needs to trigger a Spark job when this RDD contains
+# more than one partition.
+#
+# @param x An RDD to be zipped.
+# @return An RDD with zipped items.
+# @seealso zipWithUniqueId
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
+# collect(zipWithIndex(rdd))
+# # list(list("a", 0), list("b", 1), list("c", 2), list("d", 3), list("e", 4))
+#}
+# @rdname zipWithIndex
+# @aliases zipWithIndex,RDD
setMethod("zipWithIndex",
signature(x = "RDD"),
function(x) {
@@ -1406,20 +1406,20 @@ setMethod("zipWithIndex",
lapplyPartitionsWithIndex(x, partitionFunc)
})
-#' Coalesce all elements within each partition of an RDD into a list.
-#'
-#' @param x An RDD.
-#' @return An RDD created by coalescing all elements within
-#' each partition into a list.
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, as.list(1:4), 2L)
-#' collect(glom(rdd))
-#' # list(list(1, 2), list(3, 4))
-#'}
-#' @rdname glom
-#' @aliases glom,RDD
+# Coalesce all elements within each partition of an RDD into a list.
+#
+# @param x An RDD.
+# @return An RDD created by coalescing all elements within
+# each partition into a list.
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, as.list(1:4), 2L)
+# collect(glom(rdd))
+# # list(list(1, 2), list(3, 4))
+#}
+# @rdname glom
+# @aliases glom,RDD
setMethod("glom",
signature(x = "RDD"),
function(x) {
@@ -1432,21 +1432,21 @@ setMethod("glom",
############ Binary Functions #############
-#' Return the union RDD of two RDDs.
-#' The same as union() in Spark.
-#'
-#' @param x An RDD.
-#' @param y An RDD.
-#' @return a new RDD created by performing the simple union (witout removing
-#' duplicates) of two input RDDs.
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, 1:3)
-#' unionRDD(rdd, rdd) # 1, 2, 3, 1, 2, 3
-#'}
-#' @rdname unionRDD
-#' @aliases unionRDD,RDD,RDD-method
+# Return the union RDD of two RDDs.
+# The same as union() in Spark.
+#
+# @param x An RDD.
+# @param y An RDD.
+# @return a new RDD created by performing the simple union (witout removing
+# duplicates) of two input RDDs.
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, 1:3)
+# unionRDD(rdd, rdd) # 1, 2, 3, 1, 2, 3
+#}
+# @rdname unionRDD
+# @aliases unionRDD,RDD,RDD-method
setMethod("unionRDD",
signature(x = "RDD", y = "RDD"),
function(x, y) {
@@ -1463,27 +1463,27 @@ setMethod("unionRDD",
union.rdd
})
-#' Zip an RDD with another RDD.
-#'
-#' Zips this RDD with another one, returning key-value pairs with the
-#' first element in each RDD second element in each RDD, etc. Assumes
-#' that the two RDDs have the same number of partitions and the same
-#' number of elements in each partition (e.g. one was made through
-#' a map on the other).
-#'
-#' @param x An RDD to be zipped.
-#' @param other Another RDD to be zipped.
-#' @return An RDD zipped from the two RDDs.
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd1 <- parallelize(sc, 0:4)
-#' rdd2 <- parallelize(sc, 1000:1004)
-#' collect(zipRDD(rdd1, rdd2))
-#' # list(list(0, 1000), list(1, 1001), list(2, 1002), list(3, 1003), list(4, 1004))
-#'}
-#' @rdname zipRDD
-#' @aliases zipRDD,RDD
+# Zip an RDD with another RDD.
+#
+# Zips this RDD with another one, returning key-value pairs with the
+# first element in each RDD second element in each RDD, etc. Assumes
+# that the two RDDs have the same number of partitions and the same
+# number of elements in each partition (e.g. one was made through
+# a map on the other).
+#
+# @param x An RDD to be zipped.
+# @param other Another RDD to be zipped.
+# @return An RDD zipped from the two RDDs.
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd1 <- parallelize(sc, 0:4)
+# rdd2 <- parallelize(sc, 1000:1004)
+# collect(zipRDD(rdd1, rdd2))
+# # list(list(0, 1000), list(1, 1001), list(2, 1002), list(3, 1003), list(4, 1004))
+#}
+# @rdname zipRDD
+# @aliases zipRDD,RDD
setMethod("zipRDD",
signature(x = "RDD", other = "RDD"),
function(x, other) {
@@ -1502,24 +1502,24 @@ setMethod("zipRDD",
mergePartitions(rdd, TRUE)
})
-#' Cartesian product of this RDD and another one.
-#'
-#' Return the Cartesian product of this RDD and another one,
-#' that is, the RDD of all pairs of elements (a, b) where a
-#' is in this and b is in other.
-#'
-#' @param x An RDD.
-#' @param other An RDD.
-#' @return A new RDD which is the Cartesian product of these two RDDs.
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, 1:2)
-#' sortByKey(cartesian(rdd, rdd))
-#' # list(list(1, 1), list(1, 2), list(2, 1), list(2, 2))
-#'}
-#' @rdname cartesian
-#' @aliases cartesian,RDD,RDD-method
+# Cartesian product of this RDD and another one.
+#
+# Return the Cartesian product of this RDD and another one,
+# that is, the RDD of all pairs of elements (a, b) where a
+# is in this and b is in other.
+#
+# @param x An RDD.
+# @param other An RDD.
+# @return A new RDD which is the Cartesian product of these two RDDs.
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, 1:2)
+# sortByKey(cartesian(rdd, rdd))
+# # list(list(1, 1), list(1, 2), list(2, 1), list(2, 2))
+#}
+# @rdname cartesian
+# @aliases cartesian,RDD,RDD-method
setMethod("cartesian",
signature(x = "RDD", other = "RDD"),
function(x, other) {
@@ -1532,24 +1532,24 @@ setMethod("cartesian",
mergePartitions(rdd, FALSE)
})
-#' Subtract an RDD with another RDD.
-#'
-#' Return an RDD with the elements from this that are not in other.
-#'
-#' @param x An RDD.
-#' @param other An RDD.
-#' @param numPartitions Number of the partitions in the result RDD.
-#' @return An RDD with the elements from this that are not in other.
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd1 <- parallelize(sc, list(1, 1, 2, 2, 3, 4))
-#' rdd2 <- parallelize(sc, list(2, 4))
-#' collect(subtract(rdd1, rdd2))
-#' # list(1, 1, 3)
-#'}
-#' @rdname subtract
-#' @aliases subtract,RDD
+# Subtract an RDD with another RDD.
+#
+# Return an RDD with the elements from this that are not in other.
+#
+# @param x An RDD.
+# @param other An RDD.
+# @param numPartitions Number of the partitions in the result RDD.
+# @return An RDD with the elements from this that are not in other.
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd1 <- parallelize(sc, list(1, 1, 2, 2, 3, 4))
+# rdd2 <- parallelize(sc, list(2, 4))
+# collect(subtract(rdd1, rdd2))
+# # list(1, 1, 3)
+#}
+# @rdname subtract
+# @aliases subtract,RDD
setMethod("subtract",
signature(x = "RDD", other = "RDD"),
function(x, other, numPartitions = SparkR:::numPartitions(x)) {
@@ -1559,28 +1559,28 @@ setMethod("subtract",
keys(subtractByKey(rdd1, rdd2, numPartitions))
})
-#' Intersection of this RDD and another one.
-#'
-#' Return the intersection of this RDD and another one.
-#' The output will not contain any duplicate elements,
-#' even if the input RDDs did. Performs a hash partition
-#' across the cluster.
-#' Note that this method performs a shuffle internally.
-#'
-#' @param x An RDD.
-#' @param other An RDD.
-#' @param numPartitions The number of partitions in the result RDD.
-#' @return An RDD which is the intersection of these two RDDs.
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5))
-#' rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8))
-#' collect(sortBy(intersection(rdd1, rdd2), function(x) { x }))
-#' # list(1, 2, 3)
-#'}
-#' @rdname intersection
-#' @aliases intersection,RDD
+# Intersection of this RDD and another one.
+#
+# Return the intersection of this RDD and another one.
+# The output will not contain any duplicate elements,
+# even if the input RDDs did. Performs a hash partition
+# across the cluster.
+# Note that this method performs a shuffle internally.
+#
+# @param x An RDD.
+# @param other An RDD.
+# @param numPartitions The number of partitions in the result RDD.
+# @return An RDD which is the intersection of these two RDDs.
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5))
+# rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8))
+# collect(sortBy(intersection(rdd1, rdd2), function(x) { x }))
+# # list(1, 2, 3)
+#}
+# @rdname intersection
+# @aliases intersection,RDD
setMethod("intersection",
signature(x = "RDD", other = "RDD"),
function(x, other, numPartitions = SparkR:::numPartitions(x)) {
@@ -1596,26 +1596,26 @@ setMethod("intersection",
keys(filterRDD(cogroup(rdd1, rdd2, numPartitions = numPartitions), filterFunction))
})
-#' Zips an RDD's partitions with one (or more) RDD(s).
-#' Same as zipPartitions in Spark.
-#'
-#' @param ... RDDs to be zipped.
-#' @param func A function to transform zipped partitions.
-#' @return A new RDD by applying a function to the zipped partitions.
-#' Assumes that all the RDDs have the *same number of partitions*, but
-#' does *not* require them to have the same number of elements in each partition.
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
-#' rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
-#' rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
-#' collect(zipPartitions(rdd1, rdd2, rdd3,
-#' func = function(x, y, z) { list(list(x, y, z))} ))
-#' # list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6)))
-#'}
-#' @rdname zipRDD
-#' @aliases zipPartitions,RDD
+# Zips an RDD's partitions with one (or more) RDD(s).
+# Same as zipPartitions in Spark.
+#
+# @param ... RDDs to be zipped.
+# @param func A function to transform zipped partitions.
+# @return A new RDD by applying a function to the zipped partitions.
+# Assumes that all the RDDs have the *same number of partitions*, but
+# does *not* require them to have the same number of elements in each partition.
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
+# rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
+# rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
+# collect(zipPartitions(rdd1, rdd2, rdd3,
+# func = function(x, y, z) { list(list(x, y, z))} ))
+# # list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6)))
+#}
+# @rdname zipRDD
+# @aliases zipPartitions,RDD
setMethod("zipPartitions",
"RDD",
function(..., func) {
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index 4f05ba524a..cae06e6af2 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -150,21 +150,21 @@ createDataFrame <- function(sqlCtx, data, schema = NULL, samplingRatio = 1.0) {
dataFrame(sdf)
}
-#' toDF
-#'
-#' Converts an RDD to a DataFrame by infer the types.
-#'
-#' @param x An RDD
-#'
-#' @rdname DataFrame
-#' @export
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
-#' rdd <- lapply(parallelize(sc, 1:10), function(x) list(a=x, b=as.character(x)))
-#' df <- toDF(rdd)
-#' }
+# toDF
+#
+# Converts an RDD to a DataFrame by infer the types.
+#
+# @param x An RDD
+#
+# @rdname DataFrame
+# @export
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# sqlCtx <- sparkRSQL.init(sc)
+# rdd <- lapply(parallelize(sc, 1:10), function(x) list(a=x, b=as.character(x)))
+# df <- toDF(rdd)
+# }
setGeneric("toDF", function(x, ...) { standardGeneric("toDF") })
@@ -207,23 +207,23 @@ jsonFile <- function(sqlCtx, path) {
}
-#' JSON RDD
-#'
-#' Loads an RDD storing one JSON object per string as a DataFrame.
-#'
-#' @param sqlCtx SQLContext to use
-#' @param rdd An RDD of JSON string
-#' @param schema A StructType object to use as schema
-#' @param samplingRatio The ratio of simpling used to infer the schema
-#' @return A DataFrame
-#' @export
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' sqlCtx <- sparkRSQL.init(sc)
-#' rdd <- texFile(sc, "path/to/json")
-#' df <- jsonRDD(sqlCtx, rdd)
-#' }
+# JSON RDD
+#
+# Loads an RDD storing one JSON object per string as a DataFrame.
+#
+# @param sqlCtx SQLContext to use
+# @param rdd An RDD of JSON string
+# @param schema A StructType object to use as schema
+# @param samplingRatio The ratio of simpling used to infer the schema
+# @return A DataFrame
+# @export
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# sqlCtx <- sparkRSQL.init(sc)
+# rdd <- texFile(sc, "path/to/json")
+# df <- jsonRDD(sqlCtx, rdd)
+# }
# TODO: support schema
jsonRDD <- function(sqlCtx, rdd, schema = NULL, samplingRatio = 1.0) {
diff --git a/R/pkg/R/broadcast.R b/R/pkg/R/broadcast.R
index 583fa2e7fd..23dc387807 100644
--- a/R/pkg/R/broadcast.R
+++ b/R/pkg/R/broadcast.R
@@ -23,21 +23,21 @@
.broadcastValues <- new.env()
.broadcastIdToName <- new.env()
-#' @title S4 class that represents a Broadcast variable
-#' @description Broadcast variables can be created using the broadcast
-#' function from a \code{SparkContext}.
-#' @rdname broadcast-class
-#' @seealso broadcast
-#'
-#' @param id Id of the backing Spark broadcast variable
-#' @export
+# @title S4 class that represents a Broadcast variable
+# @description Broadcast variables can be created using the broadcast
+# function from a \code{SparkContext}.
+# @rdname broadcast-class
+# @seealso broadcast
+#
+# @param id Id of the backing Spark broadcast variable
+# @export
setClass("Broadcast", slots = list(id = "character"))
-#' @rdname broadcast-class
-#' @param value Value of the broadcast variable
-#' @param jBroadcastRef reference to the backing Java broadcast object
-#' @param objName name of broadcasted object
-#' @export
+# @rdname broadcast-class
+# @param value Value of the broadcast variable
+# @param jBroadcastRef reference to the backing Java broadcast object
+# @param objName name of broadcasted object
+# @export
Broadcast <- function(id, value, jBroadcastRef, objName) {
.broadcastValues[[id]] <- value
.broadcastNames[[as.character(objName)]] <- jBroadcastRef
@@ -45,13 +45,13 @@ Broadcast <- function(id, value, jBroadcastRef, objName) {
new("Broadcast", id = id)
}
-#' @description
-#' \code{value} can be used to get the value of a broadcast variable inside
-#' a distributed function.
-#'
-#' @param bcast The broadcast variable to get
-#' @rdname broadcast
-#' @aliases value,Broadcast-method
+# @description
+# \code{value} can be used to get the value of a broadcast variable inside
+# a distributed function.
+#
+# @param bcast The broadcast variable to get
+# @rdname broadcast
+# @aliases value,Broadcast-method
setMethod("value",
signature(bcast = "Broadcast"),
function(bcast) {
@@ -62,24 +62,24 @@ setMethod("value",
}
})
-#' Internal function to set values of a broadcast variable.
-#'
-#' This function is used internally by Spark to set the value of a broadcast
-#' variable on workers. Not intended for use outside the package.
-#'
-#' @rdname broadcast-internal
-#' @seealso broadcast, value
+# Internal function to set values of a broadcast variable.
+#
+# This function is used internally by Spark to set the value of a broadcast
+# variable on workers. Not intended for use outside the package.
+#
+# @rdname broadcast-internal
+# @seealso broadcast, value
-#' @param bcastId The id of broadcast variable to set
-#' @param value The value to be set
-#' @export
+# @param bcastId The id of broadcast variable to set
+# @param value The value to be set
+# @export
setBroadcastValue <- function(bcastId, value) {
bcastIdStr <- as.character(bcastId)
.broadcastValues[[bcastIdStr]] <- value
}
-#' Helper function to clear the list of broadcast variables we know about
-#' Should be called when the SparkR JVM backend is shutdown
+# Helper function to clear the list of broadcast variables we know about
+# Should be called when the SparkR JVM backend is shutdown
clearBroadcastVariables <- function() {
bcasts <- ls(.broadcastNames)
rm(list = bcasts, envir = .broadcastNames)
diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R
index b4845b6948..43be9c904f 100644
--- a/R/pkg/R/context.R
+++ b/R/pkg/R/context.R
@@ -25,27 +25,27 @@ getMinPartitions <- function(sc, minPartitions) {
as.integer(minPartitions)
}
-#' Create an RDD from a text file.
-#'
-#' This function reads a text file from HDFS, a local file system (available on all
-#' nodes), or any Hadoop-supported file system URI, and creates an
-#' RDD of strings from it.
-#'
-#' @param sc SparkContext to use
-#' @param path Path of file to read. A vector of multiple paths is allowed.
-#' @param minPartitions Minimum number of partitions to be created. If NULL, the default
-#' value is chosen based on available parallelism.
-#' @return RDD where each item is of type \code{character}
-#' @export
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' lines <- textFile(sc, "myfile.txt")
-#'}
+# Create an RDD from a text file.
+#
+# This function reads a text file from HDFS, a local file system (available on all
+# nodes), or any Hadoop-supported file system URI, and creates an
+# RDD of strings from it.
+#
+# @param sc SparkContext to use
+# @param path Path of file to read. A vector of multiple paths is allowed.
+# @param minPartitions Minimum number of partitions to be created. If NULL, the default
+# value is chosen based on available parallelism.
+# @return RDD where each item is of type \code{character}
+# @export
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# lines <- textFile(sc, "myfile.txt")
+#}
textFile <- function(sc, path, minPartitions = NULL) {
# Allow the user to have a more flexible definiton of the text file path
path <- suppressWarnings(normalizePath(path))
- #' Convert a string vector of paths to a string containing comma separated paths
+ # Convert a string vector of paths to a string containing comma separated paths
path <- paste(path, collapse = ",")
jrdd <- callJMethod(sc, "textFile", path, getMinPartitions(sc, minPartitions))
@@ -53,27 +53,27 @@ textFile <- function(sc, path, minPartitions = NULL) {
RDD(jrdd, "string")
}
-#' Load an RDD saved as a SequenceFile containing serialized objects.
-#'
-#' The file to be loaded should be one that was previously generated by calling
-#' saveAsObjectFile() of the RDD class.
-#'
-#' @param sc SparkContext to use
-#' @param path Path of file to read. A vector of multiple paths is allowed.
-#' @param minPartitions Minimum number of partitions to be created. If NULL, the default
-#' value is chosen based on available parallelism.
-#' @return RDD containing serialized R objects.
-#' @seealso saveAsObjectFile
-#' @export
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- objectFile(sc, "myfile")
-#'}
+# Load an RDD saved as a SequenceFile containing serialized objects.
+#
+# The file to be loaded should be one that was previously generated by calling
+# saveAsObjectFile() of the RDD class.
+#
+# @param sc SparkContext to use
+# @param path Path of file to read. A vector of multiple paths is allowed.
+# @param minPartitions Minimum number of partitions to be created. If NULL, the default
+# value is chosen based on available parallelism.
+# @return RDD containing serialized R objects.
+# @seealso saveAsObjectFile
+# @export
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- objectFile(sc, "myfile")
+#}
objectFile <- function(sc, path, minPartitions = NULL) {
# Allow the user to have a more flexible definiton of the text file path
path <- suppressWarnings(normalizePath(path))
- #' Convert a string vector of paths to a string containing comma separated paths
+ # Convert a string vector of paths to a string containing comma separated paths
path <- paste(path, collapse = ",")
jrdd <- callJMethod(sc, "objectFile", path, getMinPartitions(sc, minPartitions))
@@ -81,24 +81,24 @@ objectFile <- function(sc, path, minPartitions = NULL) {
RDD(jrdd, "byte")
}
-#' Create an RDD from a homogeneous list or vector.
-#'
-#' This function creates an RDD from a local homogeneous list in R. The elements
-#' in the list are split into \code{numSlices} slices and distributed to nodes
-#' in the cluster.
-#'
-#' @param sc SparkContext to use
-#' @param coll collection to parallelize
-#' @param numSlices number of partitions to create in the RDD
-#' @return an RDD created from this collection
-#' @export
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, 1:10, 2)
-#' # The RDD should contain 10 elements
-#' length(rdd)
-#'}
+# Create an RDD from a homogeneous list or vector.
+#
+# This function creates an RDD from a local homogeneous list in R. The elements
+# in the list are split into \code{numSlices} slices and distributed to nodes
+# in the cluster.
+#
+# @param sc SparkContext to use
+# @param coll collection to parallelize
+# @param numSlices number of partitions to create in the RDD
+# @return an RDD created from this collection
+# @export
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, 1:10, 2)
+# # The RDD should contain 10 elements
+# length(rdd)
+#}
parallelize <- function(sc, coll, numSlices = 1) {
# TODO: bound/safeguard numSlices
# TODO: unit tests for if the split works for all primitives
@@ -133,33 +133,33 @@ parallelize <- function(sc, coll, numSlices = 1) {
RDD(jrdd, "byte")
}
-#' Include this specified package on all workers
-#'
-#' This function can be used to include a package on all workers before the
-#' user's code is executed. This is useful in scenarios where other R package
-#' functions are used in a function passed to functions like \code{lapply}.
-#' NOTE: The package is assumed to be installed on every node in the Spark
-#' cluster.
-#'
-#' @param sc SparkContext to use
-#' @param pkg Package name
-#'
-#' @export
-#' @examples
-#'\dontrun{
-#' library(Matrix)
-#'
-#' sc <- sparkR.init()
-#' # Include the matrix library we will be using
-#' includePackage(sc, Matrix)
-#'
-#' generateSparse <- function(x) {
-#' sparseMatrix(i=c(1, 2, 3), j=c(1, 2, 3), x=c(1, 2, 3))
-#' }
-#'
-#' rdd <- lapplyPartition(parallelize(sc, 1:2, 2L), generateSparse)
-#' collect(rdd)
-#'}
+# Include this specified package on all workers
+#
+# This function can be used to include a package on all workers before the
+# user's code is executed. This is useful in scenarios where other R package
+# functions are used in a function passed to functions like \code{lapply}.
+# NOTE: The package is assumed to be installed on every node in the Spark
+# cluster.
+#
+# @param sc SparkContext to use
+# @param pkg Package name
+#
+# @export
+# @examples
+#\dontrun{
+# library(Matrix)
+#
+# sc <- sparkR.init()
+# # Include the matrix library we will be using
+# includePackage(sc, Matrix)
+#
+# generateSparse <- function(x) {
+# sparseMatrix(i=c(1, 2, 3), j=c(1, 2, 3), x=c(1, 2, 3))
+# }
+#
+# rdd <- lapplyPartition(parallelize(sc, 1:2, 2L), generateSparse)
+# collect(rdd)
+#}
includePackage <- function(sc, pkg) {
pkg <- as.character(substitute(pkg))
if (exists(".packages", .sparkREnv)) {
@@ -171,30 +171,30 @@ includePackage <- function(sc, pkg) {
.sparkREnv$.packages <- packages
}
-#' @title Broadcast a variable to all workers
-#'
-#' @description
-#' Broadcast a read-only variable to the cluster, returning a \code{Broadcast}
-#' object for reading it in distributed functions.
-#'
-#' @param sc Spark Context to use
-#' @param object Object to be broadcast
-#' @export
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, 1:2, 2L)
-#'
-#' # Large Matrix object that we want to broadcast
-#' randomMat <- matrix(nrow=100, ncol=10, data=rnorm(1000))
-#' randomMatBr <- broadcast(sc, randomMat)
-#'
-#' # Use the broadcast variable inside the function
-#' useBroadcast <- function(x) {
-#' sum(value(randomMatBr) * x)
-#' }
-#' sumRDD <- lapply(rdd, useBroadcast)
-#'}
+# @title Broadcast a variable to all workers
+#
+# @description
+# Broadcast a read-only variable to the cluster, returning a \code{Broadcast}
+# object for reading it in distributed functions.
+#
+# @param sc Spark Context to use
+# @param object Object to be broadcast
+# @export
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, 1:2, 2L)
+#
+# # Large Matrix object that we want to broadcast
+# randomMat <- matrix(nrow=100, ncol=10, data=rnorm(1000))
+# randomMatBr <- broadcast(sc, randomMat)
+#
+# # Use the broadcast variable inside the function
+# useBroadcast <- function(x) {
+# sum(value(randomMatBr) * x)
+# }
+# sumRDD <- lapply(rdd, useBroadcast)
+#}
broadcast <- function(sc, object) {
objName <- as.character(substitute(object))
serializedObj <- serialize(object, connection = NULL)
@@ -205,21 +205,21 @@ broadcast <- function(sc, object) {
Broadcast(id, object, jBroadcast, objName)
}
-#' @title Set the checkpoint directory
-#'
-#' Set the directory under which RDDs are going to be checkpointed. The
-#' directory must be a HDFS path if running on a cluster.
-#'
-#' @param sc Spark Context to use
-#' @param dirName Directory path
-#' @export
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' setCheckpointDir(sc, "~/checkpoint")
-#' rdd <- parallelize(sc, 1:2, 2L)
-#' checkpoint(rdd)
-#'}
+# @title Set the checkpoint directory
+#
+# Set the directory under which RDDs are going to be checkpointed. The
+# directory must be a HDFS path if running on a cluster.
+#
+# @param sc Spark Context to use
+# @param dirName Directory path
+# @export
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# setCheckpointDir(sc, "~/checkpoint")
+# rdd <- parallelize(sc, 1:2, 2L)
+# checkpoint(rdd)
+#}
setCheckpointDir <- function(sc, dirName) {
invisible(callJMethod(sc, "setCheckpointDir", suppressWarnings(normalizePath(dirName))))
}
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 5838955f74..380e8ebe8c 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -17,353 +17,353 @@
############ RDD Actions and Transformations ############
-#' @rdname aggregateRDD
-#' @seealso reduce
-#' @export
+# @rdname aggregateRDD
+# @seealso reduce
+# @export
setGeneric("aggregateRDD", function(x, zeroValue, seqOp, combOp) { standardGeneric("aggregateRDD") })
-#' @rdname cache-methods
-#' @export
+# @rdname cache-methods
+# @export
setGeneric("cache", function(x) { standardGeneric("cache") })
-#' @rdname coalesce
-#' @seealso repartition
-#' @export
+# @rdname coalesce
+# @seealso repartition
+# @export
setGeneric("coalesce", function(x, numPartitions, ...) { standardGeneric("coalesce") })
-#' @rdname checkpoint-methods
-#' @export
+# @rdname checkpoint-methods
+# @export
setGeneric("checkpoint", function(x) { standardGeneric("checkpoint") })
-#' @rdname collect-methods
-#' @export
+# @rdname collect-methods
+# @export
setGeneric("collect", function(x, ...) { standardGeneric("collect") })
-#' @rdname collect-methods
-#' @export
+# @rdname collect-methods
+# @export
setGeneric("collectAsMap", function(x) { standardGeneric("collectAsMap") })
-#' @rdname collect-methods
-#' @export
+# @rdname collect-methods
+# @export
setGeneric("collectPartition",
function(x, partitionId) {
standardGeneric("collectPartition")
})
-#' @rdname count
-#' @export
+# @rdname count
+# @export
setGeneric("count", function(x) { standardGeneric("count") })
-#' @rdname countByValue
-#' @export
+# @rdname countByValue
+# @export
setGeneric("countByValue", function(x) { standardGeneric("countByValue") })
-#' @rdname distinct
-#' @export
+# @rdname distinct
+# @export
setGeneric("distinct", function(x, numPartitions = 1) { standardGeneric("distinct") })
-#' @rdname filterRDD
-#' @export
+# @rdname filterRDD
+# @export
setGeneric("filterRDD", function(x, f) { standardGeneric("filterRDD") })
-#' @rdname first
-#' @export
+# @rdname first
+# @export
setGeneric("first", function(x) { standardGeneric("first") })
-#' @rdname flatMap
-#' @export
+# @rdname flatMap
+# @export
setGeneric("flatMap", function(X, FUN) { standardGeneric("flatMap") })
-#' @rdname fold
-#' @seealso reduce
-#' @export
+# @rdname fold
+# @seealso reduce
+# @export
setGeneric("fold", function(x, zeroValue, op) { standardGeneric("fold") })
-#' @rdname foreach
-#' @export
+# @rdname foreach
+# @export
setGeneric("foreach", function(x, func) { standardGeneric("foreach") })
-#' @rdname foreach
-#' @export
+# @rdname foreach
+# @export
setGeneric("foreachPartition", function(x, func) { standardGeneric("foreachPartition") })
# The jrdd accessor function.
setGeneric("getJRDD", function(rdd, ...) { standardGeneric("getJRDD") })
-#' @rdname glom
-#' @export
+# @rdname glom
+# @export
setGeneric("glom", function(x) { standardGeneric("glom") })
-#' @rdname keyBy
-#' @export
+# @rdname keyBy
+# @export
setGeneric("keyBy", function(x, func) { standardGeneric("keyBy") })
-#' @rdname lapplyPartition
-#' @export
+# @rdname lapplyPartition
+# @export
setGeneric("lapplyPartition", function(X, FUN) { standardGeneric("lapplyPartition") })
-#' @rdname lapplyPartitionsWithIndex
-#' @export
+# @rdname lapplyPartitionsWithIndex
+# @export
setGeneric("lapplyPartitionsWithIndex",
function(X, FUN) {
standardGeneric("lapplyPartitionsWithIndex")
})
-#' @rdname lapply
-#' @export
+# @rdname lapply
+# @export
setGeneric("map", function(X, FUN) { standardGeneric("map") })
-#' @rdname lapplyPartition
-#' @export
+# @rdname lapplyPartition
+# @export
setGeneric("mapPartitions", function(X, FUN) { standardGeneric("mapPartitions") })
-#' @rdname lapplyPartitionsWithIndex
-#' @export
+# @rdname lapplyPartitionsWithIndex
+# @export
setGeneric("mapPartitionsWithIndex",
function(X, FUN) { standardGeneric("mapPartitionsWithIndex") })
-#' @rdname maximum
-#' @export
+# @rdname maximum
+# @export
setGeneric("maximum", function(x) { standardGeneric("maximum") })
-#' @rdname minimum
-#' @export
+# @rdname minimum
+# @export
setGeneric("minimum", function(x) { standardGeneric("minimum") })
-#' @rdname sumRDD
-#' @export
+# @rdname sumRDD
+# @export
setGeneric("sumRDD", function(x) { standardGeneric("sumRDD") })
-#' @rdname name
-#' @export
+# @rdname name
+# @export
setGeneric("name", function(x) { standardGeneric("name") })
-#' @rdname numPartitions
-#' @export
+# @rdname numPartitions
+# @export
setGeneric("numPartitions", function(x) { standardGeneric("numPartitions") })
-#' @rdname persist
-#' @export
+# @rdname persist
+# @export
setGeneric("persist", function(x, newLevel) { standardGeneric("persist") })
-#' @rdname pipeRDD
-#' @export
+# @rdname pipeRDD
+# @export
setGeneric("pipeRDD", function(x, command, env = list()) { standardGeneric("pipeRDD")})
-#' @rdname reduce
-#' @export
+# @rdname reduce
+# @export
setGeneric("reduce", function(x, func) { standardGeneric("reduce") })
-#' @rdname repartition
-#' @seealso coalesce
-#' @export
+# @rdname repartition
+# @seealso coalesce
+# @export
setGeneric("repartition", function(x, numPartitions) { standardGeneric("repartition") })
-#' @rdname sampleRDD
-#' @export
+# @rdname sampleRDD
+# @export
setGeneric("sampleRDD",
function(x, withReplacement, fraction, seed) {
standardGeneric("sampleRDD")
})
-#' @rdname saveAsObjectFile
-#' @seealso objectFile
-#' @export
+# @rdname saveAsObjectFile
+# @seealso objectFile
+# @export
setGeneric("saveAsObjectFile", function(x, path) { standardGeneric("saveAsObjectFile") })
-#' @rdname saveAsTextFile
-#' @export
+# @rdname saveAsTextFile
+# @export
setGeneric("saveAsTextFile", function(x, path) { standardGeneric("saveAsTextFile") })
-#' @rdname setName
-#' @export
+# @rdname setName
+# @export
setGeneric("setName", function(x, name) { standardGeneric("setName") })
-#' @rdname sortBy
-#' @export
+# @rdname sortBy
+# @export
setGeneric("sortBy",
function(x, func, ascending = TRUE, numPartitions = 1) {
standardGeneric("sortBy")
})
-#' @rdname take
-#' @export
+# @rdname take
+# @export
setGeneric("take", function(x, num) { standardGeneric("take") })
-#' @rdname takeOrdered
-#' @export
+# @rdname takeOrdered
+# @export
setGeneric("takeOrdered", function(x, num) { standardGeneric("takeOrdered") })
-#' @rdname takeSample
-#' @export
+# @rdname takeSample
+# @export
setGeneric("takeSample",
function(x, withReplacement, num, seed) {
standardGeneric("takeSample")
})
-#' @rdname top
-#' @export
+# @rdname top
+# @export
setGeneric("top", function(x, num) { standardGeneric("top") })
-#' @rdname unionRDD
-#' @export
+# @rdname unionRDD
+# @export
setGeneric("unionRDD", function(x, y) { standardGeneric("unionRDD") })
-#' @rdname unpersist-methods
-#' @export
+# @rdname unpersist-methods
+# @export
setGeneric("unpersist", function(x, ...) { standardGeneric("unpersist") })
-#' @rdname zipRDD
-#' @export
+# @rdname zipRDD
+# @export
setGeneric("zipRDD", function(x, other) { standardGeneric("zipRDD") })
-#' @rdname zipRDD
-#' @export
+# @rdname zipRDD
+# @export
setGeneric("zipPartitions", function(..., func) { standardGeneric("zipPartitions") },
signature = "...")
-#' @rdname zipWithIndex
-#' @seealso zipWithUniqueId
-#' @export
+# @rdname zipWithIndex
+# @seealso zipWithUniqueId
+# @export
setGeneric("zipWithIndex", function(x) { standardGeneric("zipWithIndex") })
-#' @rdname zipWithUniqueId
-#' @seealso zipWithIndex
-#' @export
+# @rdname zipWithUniqueId
+# @seealso zipWithIndex
+# @export
setGeneric("zipWithUniqueId", function(x) { standardGeneric("zipWithUniqueId") })
############ Binary Functions #############
-#' @rdname cartesian
-#' @export
+# @rdname cartesian
+# @export
setGeneric("cartesian", function(x, other) { standardGeneric("cartesian") })
-#' @rdname countByKey
-#' @export
+# @rdname countByKey
+# @export
setGeneric("countByKey", function(x) { standardGeneric("countByKey") })
-#' @rdname flatMapValues
-#' @export
+# @rdname flatMapValues
+# @export
setGeneric("flatMapValues", function(X, FUN) { standardGeneric("flatMapValues") })
-#' @rdname intersection
-#' @export
+# @rdname intersection
+# @export
setGeneric("intersection", function(x, other, numPartitions = 1) {
standardGeneric("intersection") })
-#' @rdname keys
-#' @export
+# @rdname keys
+# @export
setGeneric("keys", function(x) { standardGeneric("keys") })
-#' @rdname lookup
-#' @export
+# @rdname lookup
+# @export
setGeneric("lookup", function(x, key) { standardGeneric("lookup") })
-#' @rdname mapValues
-#' @export
+# @rdname mapValues
+# @export
setGeneric("mapValues", function(X, FUN) { standardGeneric("mapValues") })
-#' @rdname sampleByKey
-#' @export
+# @rdname sampleByKey
+# @export
setGeneric("sampleByKey",
function(x, withReplacement, fractions, seed) {
standardGeneric("sampleByKey")
})
-#' @rdname values
-#' @export
+# @rdname values
+# @export
setGeneric("values", function(x) { standardGeneric("values") })
############ Shuffle Functions ############
-#' @rdname aggregateByKey
-#' @seealso foldByKey, combineByKey
-#' @export
+# @rdname aggregateByKey
+# @seealso foldByKey, combineByKey
+# @export
setGeneric("aggregateByKey",
function(x, zeroValue, seqOp, combOp, numPartitions) {
standardGeneric("aggregateByKey")
})
-#' @rdname cogroup
-#' @export
+# @rdname cogroup
+# @export
setGeneric("cogroup",
function(..., numPartitions) {
standardGeneric("cogroup")
},
signature = "...")
-#' @rdname combineByKey
-#' @seealso groupByKey, reduceByKey
-#' @export
+# @rdname combineByKey
+# @seealso groupByKey, reduceByKey
+# @export
setGeneric("combineByKey",
function(x, createCombiner, mergeValue, mergeCombiners, numPartitions) {
standardGeneric("combineByKey")
})
-#' @rdname foldByKey
-#' @seealso aggregateByKey, combineByKey
-#' @export
+# @rdname foldByKey
+# @seealso aggregateByKey, combineByKey
+# @export
setGeneric("foldByKey",
function(x, zeroValue, func, numPartitions) {
standardGeneric("foldByKey")
})
-#' @rdname join-methods
-#' @export
+# @rdname join-methods
+# @export
setGeneric("fullOuterJoin", function(x, y, numPartitions) { standardGeneric("fullOuterJoin") })
-#' @rdname groupByKey
-#' @seealso reduceByKey
-#' @export
+# @rdname groupByKey
+# @seealso reduceByKey
+# @export
setGeneric("groupByKey", function(x, numPartitions) { standardGeneric("groupByKey") })
-#' @rdname join-methods
-#' @export
+# @rdname join-methods
+# @export
setGeneric("join", function(x, y, ...) { standardGeneric("join") })
-#' @rdname join-methods
-#' @export
+# @rdname join-methods
+# @export
setGeneric("leftOuterJoin", function(x, y, numPartitions) { standardGeneric("leftOuterJoin") })
-#' @rdname partitionBy
-#' @export
+# @rdname partitionBy
+# @export
setGeneric("partitionBy", function(x, numPartitions, ...) { standardGeneric("partitionBy") })
-#' @rdname reduceByKey
-#' @seealso groupByKey
-#' @export
+# @rdname reduceByKey
+# @seealso groupByKey
+# @export
setGeneric("reduceByKey", function(x, combineFunc, numPartitions) { standardGeneric("reduceByKey")})
-#' @rdname reduceByKeyLocally
-#' @seealso reduceByKey
-#' @export
+# @rdname reduceByKeyLocally
+# @seealso reduceByKey
+# @export
setGeneric("reduceByKeyLocally",
function(x, combineFunc) {
standardGeneric("reduceByKeyLocally")
})
-#' @rdname join-methods
-#' @export
+# @rdname join-methods
+# @export
setGeneric("rightOuterJoin", function(x, y, numPartitions) { standardGeneric("rightOuterJoin") })
-#' @rdname sortByKey
-#' @export
+# @rdname sortByKey
+# @export
setGeneric("sortByKey",
function(x, ascending = TRUE, numPartitions = 1) {
standardGeneric("sortByKey")
})
-#' @rdname subtract
-#' @export
+# @rdname subtract
+# @export
setGeneric("subtract",
function(x, other, numPartitions = 1) {
standardGeneric("subtract")
})
-#' @rdname subtractByKey
-#' @export
+# @rdname subtractByKey
+# @export
setGeneric("subtractByKey",
function(x, other, numPartitions = 1) {
standardGeneric("subtractByKey")
@@ -372,8 +372,8 @@ setGeneric("subtractByKey",
################### Broadcast Variable Methods #################
-#' @rdname broadcast
-#' @export
+# @rdname broadcast
+# @export
setGeneric("value", function(bcast) { standardGeneric("value") })
@@ -477,8 +477,8 @@ setGeneric("showDF", function(x,...) { standardGeneric("showDF") })
#' @export
setGeneric("sortDF", function(x, col, ...) { standardGeneric("sortDF") })
-#' @rdname tojson
-#' @export
+# @rdname tojson
+# @export
setGeneric("toJSON", function(x) { standardGeneric("toJSON") })
#' @rdname DataFrame
diff --git a/R/pkg/R/pairRDD.R b/R/pkg/R/pairRDD.R
index edeb8d9f75..7694652856 100644
--- a/R/pkg/R/pairRDD.R
+++ b/R/pkg/R/pairRDD.R
@@ -21,23 +21,23 @@ NULL
############ Actions and Transformations ############
-#' Look up elements of a key in an RDD
-#'
-#' @description
-#' \code{lookup} returns a list of values in this RDD for key key.
-#'
-#' @param x The RDD to collect
-#' @param key The key to look up for
-#' @return a list of values in this RDD for key key
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' pairs <- list(c(1, 1), c(2, 2), c(1, 3))
-#' rdd <- parallelize(sc, pairs)
-#' lookup(rdd, 1) # list(1, 3)
-#'}
-#' @rdname lookup
-#' @aliases lookup,RDD-method
+# Look up elements of a key in an RDD
+#
+# @description
+# \code{lookup} returns a list of values in this RDD for key key.
+#
+# @param x The RDD to collect
+# @param key The key to look up for
+# @return a list of values in this RDD for key key
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# pairs <- list(c(1, 1), c(2, 2), c(1, 3))
+# rdd <- parallelize(sc, pairs)
+# lookup(rdd, 1) # list(1, 3)
+#}
+# @rdname lookup
+# @aliases lookup,RDD-method
setMethod("lookup",
signature(x = "RDD", key = "ANY"),
function(x, key) {
@@ -49,21 +49,21 @@ setMethod("lookup",
collect(valsRDD)
})
-#' Count the number of elements for each key, and return the result to the
-#' master as lists of (key, count) pairs.
-#'
-#' Same as countByKey in Spark.
-#'
-#' @param x The RDD to count keys.
-#' @return list of (key, count) pairs, where count is number of each key in rdd.
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, list(c("a", 1), c("b", 1), c("a", 1)))
-#' countByKey(rdd) # ("a", 2L), ("b", 1L)
-#'}
-#' @rdname countByKey
-#' @aliases countByKey,RDD-method
+# Count the number of elements for each key, and return the result to the
+# master as lists of (key, count) pairs.
+#
+# Same as countByKey in Spark.
+#
+# @param x The RDD to count keys.
+# @return list of (key, count) pairs, where count is number of each key in rdd.
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, list(c("a", 1), c("b", 1), c("a", 1)))
+# countByKey(rdd) # ("a", 2L), ("b", 1L)
+#}
+# @rdname countByKey
+# @aliases countByKey,RDD-method
setMethod("countByKey",
signature(x = "RDD"),
function(x) {
@@ -71,17 +71,17 @@ setMethod("countByKey",
countByValue(keys)
})
-#' Return an RDD with the keys of each tuple.
-#'
-#' @param x The RDD from which the keys of each tuple is returned.
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, list(list(1, 2), list(3, 4)))
-#' collect(keys(rdd)) # list(1, 3)
-#'}
-#' @rdname keys
-#' @aliases keys,RDD
+# Return an RDD with the keys of each tuple.
+#
+# @param x The RDD from which the keys of each tuple is returned.
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, list(list(1, 2), list(3, 4)))
+# collect(keys(rdd)) # list(1, 3)
+#}
+# @rdname keys
+# @aliases keys,RDD
setMethod("keys",
signature(x = "RDD"),
function(x) {
@@ -91,17 +91,17 @@ setMethod("keys",
lapply(x, func)
})
-#' Return an RDD with the values of each tuple.
-#'
-#' @param x The RDD from which the values of each tuple is returned.
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, list(list(1, 2), list(3, 4)))
-#' collect(values(rdd)) # list(2, 4)
-#'}
-#' @rdname values
-#' @aliases values,RDD
+# Return an RDD with the values of each tuple.
+#
+# @param x The RDD from which the values of each tuple is returned.
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, list(list(1, 2), list(3, 4)))
+# collect(values(rdd)) # list(2, 4)
+#}
+# @rdname values
+# @aliases values,RDD
setMethod("values",
signature(x = "RDD"),
function(x) {
@@ -111,23 +111,23 @@ setMethod("values",
lapply(x, func)
})
-#' Applies a function to all values of the elements, without modifying the keys.
-#'
-#' The same as `mapValues()' in Spark.
-#'
-#' @param X The RDD to apply the transformation.
-#' @param FUN the transformation to apply on the value of each element.
-#' @return a new RDD created by the transformation.
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, 1:10)
-#' makePairs <- lapply(rdd, function(x) { list(x, x) })
-#' collect(mapValues(makePairs, function(x) { x * 2) })
-#' Output: list(list(1,2), list(2,4), list(3,6), ...)
-#'}
-#' @rdname mapValues
-#' @aliases mapValues,RDD,function-method
+# Applies a function to all values of the elements, without modifying the keys.
+#
+# The same as `mapValues()' in Spark.
+#
+# @param X The RDD to apply the transformation.
+# @param FUN the transformation to apply on the value of each element.
+# @return a new RDD created by the transformation.
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, 1:10)
+# makePairs <- lapply(rdd, function(x) { list(x, x) })
+# collect(mapValues(makePairs, function(x) { x * 2) })
+# Output: list(list(1,2), list(2,4), list(3,6), ...)
+#}
+# @rdname mapValues
+# @aliases mapValues,RDD,function-method
setMethod("mapValues",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
@@ -137,23 +137,23 @@ setMethod("mapValues",
lapply(X, func)
})
-#' Pass each value in the key-value pair RDD through a flatMap function without
-#' changing the keys; this also retains the original RDD's partitioning.
-#'
-#' The same as 'flatMapValues()' in Spark.
-#'
-#' @param X The RDD to apply the transformation.
-#' @param FUN the transformation to apply on the value of each element.
-#' @return a new RDD created by the transformation.
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, list(list(1, c(1,2)), list(2, c(3,4))))
-#' collect(flatMapValues(rdd, function(x) { x }))
-#' Output: list(list(1,1), list(1,2), list(2,3), list(2,4))
-#'}
-#' @rdname flatMapValues
-#' @aliases flatMapValues,RDD,function-method
+# Pass each value in the key-value pair RDD through a flatMap function without
+# changing the keys; this also retains the original RDD's partitioning.
+#
+# The same as 'flatMapValues()' in Spark.
+#
+# @param X The RDD to apply the transformation.
+# @param FUN the transformation to apply on the value of each element.
+# @return a new RDD created by the transformation.
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, list(list(1, c(1,2)), list(2, c(3,4))))
+# collect(flatMapValues(rdd, function(x) { x }))
+# Output: list(list(1,1), list(1,2), list(2,3), list(2,4))
+#}
+# @rdname flatMapValues
+# @aliases flatMapValues,RDD,function-method
setMethod("flatMapValues",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
@@ -165,30 +165,30 @@ setMethod("flatMapValues",
############ Shuffle Functions ############
-#' Partition an RDD by key
-#'
-#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
-#' For each element of this RDD, the partitioner is used to compute a hash
-#' function and the RDD is partitioned using this hash value.
-#'
-#' @param x The RDD to partition. Should be an RDD where each element is
-#' list(K, V) or c(K, V).
-#' @param numPartitions Number of partitions to create.
-#' @param ... Other optional arguments to partitionBy.
-#'
-#' @param partitionFunc The partition function to use. Uses a default hashCode
-#' function if not provided
-#' @return An RDD partitioned using the specified partitioner.
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
-#' rdd <- parallelize(sc, pairs)
-#' parts <- partitionBy(rdd, 2L)
-#' collectPartition(parts, 0L) # First partition should contain list(1, 2) and list(1, 4)
-#'}
-#' @rdname partitionBy
-#' @aliases partitionBy,RDD,integer-method
+# Partition an RDD by key
+#
+# This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
+# For each element of this RDD, the partitioner is used to compute a hash
+# function and the RDD is partitioned using this hash value.
+#
+# @param x The RDD to partition. Should be an RDD where each element is
+# list(K, V) or c(K, V).
+# @param numPartitions Number of partitions to create.
+# @param ... Other optional arguments to partitionBy.
+#
+# @param partitionFunc The partition function to use. Uses a default hashCode
+# function if not provided
+# @return An RDD partitioned using the specified partitioner.
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
+# rdd <- parallelize(sc, pairs)
+# parts <- partitionBy(rdd, 2L)
+# collectPartition(parts, 0L) # First partition should contain list(1, 2) and list(1, 4)
+#}
+# @rdname partitionBy
+# @aliases partitionBy,RDD,integer-method
setMethod("partitionBy",
signature(x = "RDD", numPartitions = "numeric"),
function(x, numPartitions, partitionFunc = hashCode) {
@@ -234,27 +234,27 @@ setMethod("partitionBy",
RDD(r, serializedMode = "byte")
})
-#' Group values by key
-#'
-#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
-#' and group values for each key in the RDD into a single sequence.
-#'
-#' @param x The RDD to group. Should be an RDD where each element is
-#' list(K, V) or c(K, V).
-#' @param numPartitions Number of partitions to create.
-#' @return An RDD where each element is list(K, list(V))
-#' @seealso reduceByKey
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
-#' rdd <- parallelize(sc, pairs)
-#' parts <- groupByKey(rdd, 2L)
-#' grouped <- collect(parts)
-#' grouped[[1]] # Should be a list(1, list(2, 4))
-#'}
-#' @rdname groupByKey
-#' @aliases groupByKey,RDD,integer-method
+# Group values by key
+#
+# This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
+# and group values for each key in the RDD into a single sequence.
+#
+# @param x The RDD to group. Should be an RDD where each element is
+# list(K, V) or c(K, V).
+# @param numPartitions Number of partitions to create.
+# @return An RDD where each element is list(K, list(V))
+# @seealso reduceByKey
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
+# rdd <- parallelize(sc, pairs)
+# parts <- groupByKey(rdd, 2L)
+# grouped <- collect(parts)
+# grouped[[1]] # Should be a list(1, list(2, 4))
+#}
+# @rdname groupByKey
+# @aliases groupByKey,RDD,integer-method
setMethod("groupByKey",
signature(x = "RDD", numPartitions = "numeric"),
function(x, numPartitions) {
@@ -292,28 +292,28 @@ setMethod("groupByKey",
lapplyPartition(shuffled, groupVals)
})
-#' Merge values by key
-#'
-#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
-#' and merges the values for each key using an associative reduce function.
-#'
-#' @param x The RDD to reduce by key. Should be an RDD where each element is
-#' list(K, V) or c(K, V).
-#' @param combineFunc The associative reduce function to use.
-#' @param numPartitions Number of partitions to create.
-#' @return An RDD where each element is list(K, V') where V' is the merged
-#' value
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
-#' rdd <- parallelize(sc, pairs)
-#' parts <- reduceByKey(rdd, "+", 2L)
-#' reduced <- collect(parts)
-#' reduced[[1]] # Should be a list(1, 6)
-#'}
-#' @rdname reduceByKey
-#' @aliases reduceByKey,RDD,integer-method
+# Merge values by key
+#
+# This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
+# and merges the values for each key using an associative reduce function.
+#
+# @param x The RDD to reduce by key. Should be an RDD where each element is
+# list(K, V) or c(K, V).
+# @param combineFunc The associative reduce function to use.
+# @param numPartitions Number of partitions to create.
+# @return An RDD where each element is list(K, V') where V' is the merged
+# value
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
+# rdd <- parallelize(sc, pairs)
+# parts <- reduceByKey(rdd, "+", 2L)
+# reduced <- collect(parts)
+# reduced[[1]] # Should be a list(1, 6)
+#}
+# @rdname reduceByKey
+# @aliases reduceByKey,RDD,integer-method
setMethod("reduceByKey",
signature(x = "RDD", combineFunc = "ANY", numPartitions = "numeric"),
function(x, combineFunc, numPartitions) {
@@ -333,27 +333,27 @@ setMethod("reduceByKey",
lapplyPartition(shuffled, reduceVals)
})
-#' Merge values by key locally
-#'
-#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
-#' and merges the values for each key using an associative reduce function, but return the
-#' results immediately to the driver as an R list.
-#'
-#' @param x The RDD to reduce by key. Should be an RDD where each element is
-#' list(K, V) or c(K, V).
-#' @param combineFunc The associative reduce function to use.
-#' @return A list of elements of type list(K, V') where V' is the merged value for each key
-#' @seealso reduceByKey
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
-#' rdd <- parallelize(sc, pairs)
-#' reduced <- reduceByKeyLocally(rdd, "+")
-#' reduced # list(list(1, 6), list(1.1, 3))
-#'}
-#' @rdname reduceByKeyLocally
-#' @aliases reduceByKeyLocally,RDD,integer-method
+# Merge values by key locally
+#
+# This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
+# and merges the values for each key using an associative reduce function, but return the
+# results immediately to the driver as an R list.
+#
+# @param x The RDD to reduce by key. Should be an RDD where each element is
+# list(K, V) or c(K, V).
+# @param combineFunc The associative reduce function to use.
+# @return A list of elements of type list(K, V') where V' is the merged value for each key
+# @seealso reduceByKey
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
+# rdd <- parallelize(sc, pairs)
+# reduced <- reduceByKeyLocally(rdd, "+")
+# reduced # list(list(1, 6), list(1.1, 3))
+#}
+# @rdname reduceByKeyLocally
+# @aliases reduceByKeyLocally,RDD,integer-method
setMethod("reduceByKeyLocally",
signature(x = "RDD", combineFunc = "ANY"),
function(x, combineFunc) {
@@ -385,41 +385,41 @@ setMethod("reduceByKeyLocally",
convertEnvsToList(merged[[1]], merged[[2]])
})
-#' Combine values by key
-#'
-#' Generic function to combine the elements for each key using a custom set of
-#' aggregation functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)],
-#' for a "combined type" C. Note that V and C can be different -- for example, one
-#' might group an RDD of type (Int, Int) into an RDD of type (Int, Seq[Int]).
-
-#' Users provide three functions:
-#' \itemize{
-#' \item createCombiner, which turns a V into a C (e.g., creates a one-element list)
-#' \item mergeValue, to merge a V into a C (e.g., adds it to the end of a list) -
-#' \item mergeCombiners, to combine two C's into a single one (e.g., concatentates
-#' two lists).
-#' }
-#'
-#' @param x The RDD to combine. Should be an RDD where each element is
-#' list(K, V) or c(K, V).
-#' @param createCombiner Create a combiner (C) given a value (V)
-#' @param mergeValue Merge the given value (V) with an existing combiner (C)
-#' @param mergeCombiners Merge two combiners and return a new combiner
-#' @param numPartitions Number of partitions to create.
-#' @return An RDD where each element is list(K, C) where C is the combined type
-#'
-#' @seealso groupByKey, reduceByKey
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
-#' rdd <- parallelize(sc, pairs)
-#' parts <- combineByKey(rdd, function(x) { x }, "+", "+", 2L)
-#' combined <- collect(parts)
-#' combined[[1]] # Should be a list(1, 6)
-#'}
-#' @rdname combineByKey
-#' @aliases combineByKey,RDD,ANY,ANY,ANY,integer-method
+# Combine values by key
+#
+# Generic function to combine the elements for each key using a custom set of
+# aggregation functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)],
+# for a "combined type" C. Note that V and C can be different -- for example, one
+# might group an RDD of type (Int, Int) into an RDD of type (Int, Seq[Int]).
+
+# Users provide three functions:
+# \itemize{
+# \item createCombiner, which turns a V into a C (e.g., creates a one-element list)
+# \item mergeValue, to merge a V into a C (e.g., adds it to the end of a list) -
+# \item mergeCombiners, to combine two C's into a single one (e.g., concatentates
+# two lists).
+# }
+#
+# @param x The RDD to combine. Should be an RDD where each element is
+# list(K, V) or c(K, V).
+# @param createCombiner Create a combiner (C) given a value (V)
+# @param mergeValue Merge the given value (V) with an existing combiner (C)
+# @param mergeCombiners Merge two combiners and return a new combiner
+# @param numPartitions Number of partitions to create.
+# @return An RDD where each element is list(K, C) where C is the combined type
+#
+# @seealso groupByKey, reduceByKey
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
+# rdd <- parallelize(sc, pairs)
+# parts <- combineByKey(rdd, function(x) { x }, "+", "+", 2L)
+# combined <- collect(parts)
+# combined[[1]] # Should be a list(1, 6)
+#}
+# @rdname combineByKey
+# @aliases combineByKey,RDD,ANY,ANY,ANY,integer-method
setMethod("combineByKey",
signature(x = "RDD", createCombiner = "ANY", mergeValue = "ANY",
mergeCombiners = "ANY", numPartitions = "numeric"),
@@ -451,36 +451,36 @@ setMethod("combineByKey",
lapplyPartition(shuffled, mergeAfterShuffle)
})
-#' Aggregate a pair RDD by each key.
-#'
-#' Aggregate the values of each key in an RDD, using given combine functions
-#' and a neutral "zero value". This function can return a different result type,
-#' U, than the type of the values in this RDD, V. Thus, we need one operation
-#' for merging a V into a U and one operation for merging two U's, The former
-#' operation is used for merging values within a partition, and the latter is
-#' used for merging values between partitions. To avoid memory allocation, both
-#' of these functions are allowed to modify and return their first argument
-#' instead of creating a new U.
-#'
-#' @param x An RDD.
-#' @param zeroValue A neutral "zero value".
-#' @param seqOp A function to aggregate the values of each key. It may return
-#' a different result type from the type of the values.
-#' @param combOp A function to aggregate results of seqOp.
-#' @return An RDD containing the aggregation result.
-#' @seealso foldByKey, combineByKey
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
-#' zeroValue <- list(0, 0)
-#' seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
-#' combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
-#' aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
-#' # list(list(1, list(3, 2)), list(2, list(7, 2)))
-#'}
-#' @rdname aggregateByKey
-#' @aliases aggregateByKey,RDD,ANY,ANY,ANY,integer-method
+# Aggregate a pair RDD by each key.
+#
+# Aggregate the values of each key in an RDD, using given combine functions
+# and a neutral "zero value". This function can return a different result type,
+# U, than the type of the values in this RDD, V. Thus, we need one operation
+# for merging a V into a U and one operation for merging two U's, The former
+# operation is used for merging values within a partition, and the latter is
+# used for merging values between partitions. To avoid memory allocation, both
+# of these functions are allowed to modify and return their first argument
+# instead of creating a new U.
+#
+# @param x An RDD.
+# @param zeroValue A neutral "zero value".
+# @param seqOp A function to aggregate the values of each key. It may return
+# a different result type from the type of the values.
+# @param combOp A function to aggregate results of seqOp.
+# @return An RDD containing the aggregation result.
+# @seealso foldByKey, combineByKey
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
+# zeroValue <- list(0, 0)
+# seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
+# combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
+# aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
+# # list(list(1, list(3, 2)), list(2, list(7, 2)))
+#}
+# @rdname aggregateByKey
+# @aliases aggregateByKey,RDD,ANY,ANY,ANY,integer-method
setMethod("aggregateByKey",
signature(x = "RDD", zeroValue = "ANY", seqOp = "ANY",
combOp = "ANY", numPartitions = "numeric"),
@@ -492,26 +492,26 @@ setMethod("aggregateByKey",
combineByKey(x, createCombiner, seqOp, combOp, numPartitions)
})
-#' Fold a pair RDD by each key.
-#'
-#' Aggregate the values of each key in an RDD, using an associative function "func"
-#' and a neutral "zero value" which may be added to the result an arbitrary
-#' number of times, and must not change the result (e.g., 0 for addition, or
-#' 1 for multiplication.).
-#'
-#' @param x An RDD.
-#' @param zeroValue A neutral "zero value".
-#' @param func An associative function for folding values of each key.
-#' @return An RDD containing the aggregation result.
-#' @seealso aggregateByKey, combineByKey
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
-#' foldByKey(rdd, 0, "+", 2L) # list(list(1, 3), list(2, 7))
-#'}
-#' @rdname foldByKey
-#' @aliases foldByKey,RDD,ANY,ANY,integer-method
+# Fold a pair RDD by each key.
+#
+# Aggregate the values of each key in an RDD, using an associative function "func"
+# and a neutral "zero value" which may be added to the result an arbitrary
+# number of times, and must not change the result (e.g., 0 for addition, or
+# 1 for multiplication.).
+#
+# @param x An RDD.
+# @param zeroValue A neutral "zero value".
+# @param func An associative function for folding values of each key.
+# @return An RDD containing the aggregation result.
+# @seealso aggregateByKey, combineByKey
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
+# foldByKey(rdd, 0, "+", 2L) # list(list(1, 3), list(2, 7))
+#}
+# @rdname foldByKey
+# @aliases foldByKey,RDD,ANY,ANY,integer-method
setMethod("foldByKey",
signature(x = "RDD", zeroValue = "ANY",
func = "ANY", numPartitions = "numeric"),
@@ -521,28 +521,28 @@ setMethod("foldByKey",
############ Binary Functions #############
-#' Join two RDDs
-#'
-#' @description
-#' \code{join} This function joins two RDDs where every element is of the form list(K, V).
-#' The key types of the two RDDs should be the same.
-#'
-#' @param x An RDD to be joined. Should be an RDD where each element is
-#' list(K, V).
-#' @param y An RDD to be joined. Should be an RDD where each element is
-#' list(K, V).
-#' @param numPartitions Number of partitions to create.
-#' @return a new RDD containing all pairs of elements with matching keys in
-#' two input RDDs.
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
-#' rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
-#' join(rdd1, rdd2, 2L) # list(list(1, list(1, 2)), list(1, list(1, 3))
-#'}
-#' @rdname join-methods
-#' @aliases join,RDD,RDD-method
+# Join two RDDs
+#
+# @description
+# \code{join} This function joins two RDDs where every element is of the form list(K, V).
+# The key types of the two RDDs should be the same.
+#
+# @param x An RDD to be joined. Should be an RDD where each element is
+# list(K, V).
+# @param y An RDD to be joined. Should be an RDD where each element is
+# list(K, V).
+# @param numPartitions Number of partitions to create.
+# @return a new RDD containing all pairs of elements with matching keys in
+# two input RDDs.
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
+# rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
+# join(rdd1, rdd2, 2L) # list(list(1, list(1, 2)), list(1, list(1, 3))
+#}
+# @rdname join-methods
+# @aliases join,RDD,RDD-method
setMethod("join",
signature(x = "RDD", y = "RDD"),
function(x, y, numPartitions) {
@@ -557,30 +557,30 @@ setMethod("join",
doJoin)
})
-#' Left outer join two RDDs
-#'
-#' @description
-#' \code{leftouterjoin} This function left-outer-joins two RDDs where every element is of the form list(K, V).
-#' The key types of the two RDDs should be the same.
-#'
-#' @param x An RDD to be joined. Should be an RDD where each element is
-#' list(K, V).
-#' @param y An RDD to be joined. Should be an RDD where each element is
-#' list(K, V).
-#' @param numPartitions Number of partitions to create.
-#' @return For each element (k, v) in x, the resulting RDD will either contain
-#' all pairs (k, (v, w)) for (k, w) in rdd2, or the pair (k, (v, NULL))
-#' if no elements in rdd2 have key k.
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
-#' rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
-#' leftOuterJoin(rdd1, rdd2, 2L)
-#' # list(list(1, list(1, 2)), list(1, list(1, 3)), list(2, list(4, NULL)))
-#'}
-#' @rdname join-methods
-#' @aliases leftOuterJoin,RDD,RDD-method
+# Left outer join two RDDs
+#
+# @description
+# \code{leftouterjoin} This function left-outer-joins two RDDs where every element is of the form list(K, V).
+# The key types of the two RDDs should be the same.
+#
+# @param x An RDD to be joined. Should be an RDD where each element is
+# list(K, V).
+# @param y An RDD to be joined. Should be an RDD where each element is
+# list(K, V).
+# @param numPartitions Number of partitions to create.
+# @return For each element (k, v) in x, the resulting RDD will either contain
+# all pairs (k, (v, w)) for (k, w) in rdd2, or the pair (k, (v, NULL))
+# if no elements in rdd2 have key k.
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
+# rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
+# leftOuterJoin(rdd1, rdd2, 2L)
+# # list(list(1, list(1, 2)), list(1, list(1, 3)), list(2, list(4, NULL)))
+#}
+# @rdname join-methods
+# @aliases leftOuterJoin,RDD,RDD-method
setMethod("leftOuterJoin",
signature(x = "RDD", y = "RDD", numPartitions = "numeric"),
function(x, y, numPartitions) {
@@ -594,30 +594,30 @@ setMethod("leftOuterJoin",
joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin)
})
-#' Right outer join two RDDs
-#'
-#' @description
-#' \code{rightouterjoin} This function right-outer-joins two RDDs where every element is of the form list(K, V).
-#' The key types of the two RDDs should be the same.
-#'
-#' @param x An RDD to be joined. Should be an RDD where each element is
-#' list(K, V).
-#' @param y An RDD to be joined. Should be an RDD where each element is
-#' list(K, V).
-#' @param numPartitions Number of partitions to create.
-#' @return For each element (k, w) in y, the resulting RDD will either contain
-#' all pairs (k, (v, w)) for (k, v) in x, or the pair (k, (NULL, w))
-#' if no elements in x have key k.
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3)))
-#' rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
-#' rightOuterJoin(rdd1, rdd2, 2L)
-#' # list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4)))
-#'}
-#' @rdname join-methods
-#' @aliases rightOuterJoin,RDD,RDD-method
+# Right outer join two RDDs
+#
+# @description
+# \code{rightouterjoin} This function right-outer-joins two RDDs where every element is of the form list(K, V).
+# The key types of the two RDDs should be the same.
+#
+# @param x An RDD to be joined. Should be an RDD where each element is
+# list(K, V).
+# @param y An RDD to be joined. Should be an RDD where each element is
+# list(K, V).
+# @param numPartitions Number of partitions to create.
+# @return For each element (k, w) in y, the resulting RDD will either contain
+# all pairs (k, (v, w)) for (k, v) in x, or the pair (k, (NULL, w))
+# if no elements in x have key k.
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3)))
+# rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
+# rightOuterJoin(rdd1, rdd2, 2L)
+# # list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4)))
+#}
+# @rdname join-methods
+# @aliases rightOuterJoin,RDD,RDD-method
setMethod("rightOuterJoin",
signature(x = "RDD", y = "RDD", numPartitions = "numeric"),
function(x, y, numPartitions) {
@@ -631,33 +631,33 @@ setMethod("rightOuterJoin",
joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin)
})
-#' Full outer join two RDDs
-#'
-#' @description
-#' \code{fullouterjoin} This function full-outer-joins two RDDs where every element is of the form list(K, V).
-#' The key types of the two RDDs should be the same.
-#'
-#' @param x An RDD to be joined. Should be an RDD where each element is
-#' list(K, V).
-#' @param y An RDD to be joined. Should be an RDD where each element is
-#' list(K, V).
-#' @param numPartitions Number of partitions to create.
-#' @return For each element (k, v) in x and (k, w) in y, the resulting RDD
-#' will contain all pairs (k, (v, w)) for both (k, v) in x and
-#' (k, w) in y, or the pair (k, (NULL, w))/(k, (v, NULL)) if no elements
-#' in x/y have key k.
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3), list(3, 3)))
-#' rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
-#' fullOuterJoin(rdd1, rdd2, 2L) # list(list(1, list(2, 1)),
-#' # list(1, list(3, 1)),
-#' # list(2, list(NULL, 4)))
-#' # list(3, list(3, NULL)),
-#'}
-#' @rdname join-methods
-#' @aliases fullOuterJoin,RDD,RDD-method
+# Full outer join two RDDs
+#
+# @description
+# \code{fullouterjoin} This function full-outer-joins two RDDs where every element is of the form list(K, V).
+# The key types of the two RDDs should be the same.
+#
+# @param x An RDD to be joined. Should be an RDD where each element is
+# list(K, V).
+# @param y An RDD to be joined. Should be an RDD where each element is
+# list(K, V).
+# @param numPartitions Number of partitions to create.
+# @return For each element (k, v) in x and (k, w) in y, the resulting RDD
+# will contain all pairs (k, (v, w)) for both (k, v) in x and
+# (k, w) in y, or the pair (k, (NULL, w))/(k, (v, NULL)) if no elements
+# in x/y have key k.
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3), list(3, 3)))
+# rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
+# fullOuterJoin(rdd1, rdd2, 2L) # list(list(1, list(2, 1)),
+# # list(1, list(3, 1)),
+# # list(2, list(NULL, 4)))
+# # list(3, list(3, NULL)),
+#}
+# @rdname join-methods
+# @aliases fullOuterJoin,RDD,RDD-method
setMethod("fullOuterJoin",
signature(x = "RDD", y = "RDD", numPartitions = "numeric"),
function(x, y, numPartitions) {
@@ -671,23 +671,23 @@ setMethod("fullOuterJoin",
joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin)
})
-#' For each key k in several RDDs, return a resulting RDD that
-#' whose values are a list of values for the key in all RDDs.
-#'
-#' @param ... Several RDDs.
-#' @param numPartitions Number of partitions to create.
-#' @return a new RDD containing all pairs of elements with values in a list
-#' in all RDDs.
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
-#' rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
-#' cogroup(rdd1, rdd2, numPartitions = 2L)
-#' # list(list(1, list(1, list(2, 3))), list(2, list(list(4), list()))
-#'}
-#' @rdname cogroup
-#' @aliases cogroup,RDD-method
+# For each key k in several RDDs, return a resulting RDD that
+# whose values are a list of values for the key in all RDDs.
+#
+# @param ... Several RDDs.
+# @param numPartitions Number of partitions to create.
+# @return a new RDD containing all pairs of elements with values in a list
+# in all RDDs.
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
+# rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
+# cogroup(rdd1, rdd2, numPartitions = 2L)
+# # list(list(1, list(1, list(2, 3))), list(2, list(list(4), list()))
+#}
+# @rdname cogroup
+# @aliases cogroup,RDD-method
setMethod("cogroup",
"RDD",
function(..., numPartitions) {
@@ -723,20 +723,20 @@ setMethod("cogroup",
group.func)
})
-#' Sort a (k, v) pair RDD by k.
-#'
-#' @param x A (k, v) pair RDD to be sorted.
-#' @param ascending A flag to indicate whether the sorting is ascending or descending.
-#' @param numPartitions Number of partitions to create.
-#' @return An RDD where all (k, v) pair elements are sorted.
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, list(list(3, 1), list(2, 2), list(1, 3)))
-#' collect(sortByKey(rdd)) # list (list(1, 3), list(2, 2), list(3, 1))
-#'}
-#' @rdname sortByKey
-#' @aliases sortByKey,RDD,RDD-method
+# Sort a (k, v) pair RDD by k.
+#
+# @param x A (k, v) pair RDD to be sorted.
+# @param ascending A flag to indicate whether the sorting is ascending or descending.
+# @param numPartitions Number of partitions to create.
+# @return An RDD where all (k, v) pair elements are sorted.
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, list(list(3, 1), list(2, 2), list(1, 3)))
+# collect(sortByKey(rdd)) # list (list(1, 3), list(2, 2), list(3, 1))
+#}
+# @rdname sortByKey
+# @aliases sortByKey,RDD,RDD-method
setMethod("sortByKey",
signature(x = "RDD"),
function(x, ascending = TRUE, numPartitions = SparkR:::numPartitions(x)) {
@@ -785,25 +785,25 @@ setMethod("sortByKey",
lapplyPartition(newRDD, partitionFunc)
})
-#' Subtract a pair RDD with another pair RDD.
-#'
-#' Return an RDD with the pairs from x whose keys are not in other.
-#'
-#' @param x An RDD.
-#' @param other An RDD.
-#' @param numPartitions Number of the partitions in the result RDD.
-#' @return An RDD with the pairs from x whose keys are not in other.
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd1 <- parallelize(sc, list(list("a", 1), list("b", 4),
-#' list("b", 5), list("a", 2)))
-#' rdd2 <- parallelize(sc, list(list("a", 3), list("c", 1)))
-#' collect(subtractByKey(rdd1, rdd2))
-#' # list(list("b", 4), list("b", 5))
-#'}
-#' @rdname subtractByKey
-#' @aliases subtractByKey,RDD
+# Subtract a pair RDD with another pair RDD.
+#
+# Return an RDD with the pairs from x whose keys are not in other.
+#
+# @param x An RDD.
+# @param other An RDD.
+# @param numPartitions Number of the partitions in the result RDD.
+# @return An RDD with the pairs from x whose keys are not in other.
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd1 <- parallelize(sc, list(list("a", 1), list("b", 4),
+# list("b", 5), list("a", 2)))
+# rdd2 <- parallelize(sc, list(list("a", 3), list("c", 1)))
+# collect(subtractByKey(rdd1, rdd2))
+# # list(list("b", 4), list("b", 5))
+#}
+# @rdname subtractByKey
+# @aliases subtractByKey,RDD
setMethod("subtractByKey",
signature(x = "RDD", other = "RDD"),
function(x, other, numPartitions = SparkR:::numPartitions(x)) {
@@ -819,41 +819,41 @@ setMethod("subtractByKey",
function (v) { v[[1]] })
})
-#' Return a subset of this RDD sampled by key.
-#'
-#' @description
-#' \code{sampleByKey} Create a sample of this RDD using variable sampling rates
-#' for different keys as specified by fractions, a key to sampling rate map.
-#'
-#' @param x The RDD to sample elements by key, where each element is
-#' list(K, V) or c(K, V).
-#' @param withReplacement Sampling with replacement or not
-#' @param fraction The (rough) sample target fraction
-#' @param seed Randomness seed value
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' rdd <- parallelize(sc, 1:3000)
-#' pairs <- lapply(rdd, function(x) { if (x %% 3 == 0) list("a", x)
-#' else { if (x %% 3 == 1) list("b", x) else list("c", x) }})
-#' fractions <- list(a = 0.2, b = 0.1, c = 0.3)
-#' sample <- sampleByKey(pairs, FALSE, fractions, 1618L)
-#' 100 < length(lookup(sample, "a")) && 300 > length(lookup(sample, "a")) # TRUE
-#' 50 < length(lookup(sample, "b")) && 150 > length(lookup(sample, "b")) # TRUE
-#' 200 < length(lookup(sample, "c")) && 400 > length(lookup(sample, "c")) # TRUE
-#' lookup(sample, "a")[which.min(lookup(sample, "a"))] >= 0 # TRUE
-#' lookup(sample, "a")[which.max(lookup(sample, "a"))] <= 2000 # TRUE
-#' lookup(sample, "b")[which.min(lookup(sample, "b"))] >= 0 # TRUE
-#' lookup(sample, "b")[which.max(lookup(sample, "b"))] <= 2000 # TRUE
-#' lookup(sample, "c")[which.min(lookup(sample, "c"))] >= 0 # TRUE
-#' lookup(sample, "c")[which.max(lookup(sample, "c"))] <= 2000 # TRUE
-#' fractions <- list(a = 0.2, b = 0.1, c = 0.3, d = 0.4)
-#' sample <- sampleByKey(pairs, FALSE, fractions, 1618L) # Key "d" will be ignored
-#' fractions <- list(a = 0.2, b = 0.1)
-#' sample <- sampleByKey(pairs, FALSE, fractions, 1618L) # KeyError: "c"
-#'}
-#' @rdname sampleByKey
-#' @aliases sampleByKey,RDD-method
+# Return a subset of this RDD sampled by key.
+#
+# @description
+# \code{sampleByKey} Create a sample of this RDD using variable sampling rates
+# for different keys as specified by fractions, a key to sampling rate map.
+#
+# @param x The RDD to sample elements by key, where each element is
+# list(K, V) or c(K, V).
+# @param withReplacement Sampling with replacement or not
+# @param fraction The (rough) sample target fraction
+# @param seed Randomness seed value
+# @examples
+#\dontrun{
+# sc <- sparkR.init()
+# rdd <- parallelize(sc, 1:3000)
+# pairs <- lapply(rdd, function(x) { if (x %% 3 == 0) list("a", x)
+# else { if (x %% 3 == 1) list("b", x) else list("c", x) }})
+# fractions <- list(a = 0.2, b = 0.1, c = 0.3)
+# sample <- sampleByKey(pairs, FALSE, fractions, 1618L)
+# 100 < length(lookup(sample, "a")) && 300 > length(lookup(sample, "a")) # TRUE
+# 50 < length(lookup(sample, "b")) && 150 > length(lookup(sample, "b")) # TRUE
+# 200 < length(lookup(sample, "c")) && 400 > length(lookup(sample, "c")) # TRUE
+# lookup(sample, "a")[which.min(lookup(sample, "a"))] >= 0 # TRUE
+# lookup(sample, "a")[which.max(lookup(sample, "a"))] <= 2000 # TRUE
+# lookup(sample, "b")[which.min(lookup(sample, "b"))] >= 0 # TRUE
+# lookup(sample, "b")[which.max(lookup(sample, "b"))] <= 2000 # TRUE
+# lookup(sample, "c")[which.min(lookup(sample, "c"))] >= 0 # TRUE
+# lookup(sample, "c")[which.max(lookup(sample, "c"))] <= 2000 # TRUE
+# fractions <- list(a = 0.2, b = 0.1, c = 0.3, d = 0.4)
+# sample <- sampleByKey(pairs, FALSE, fractions, 1618L) # Key "d" will be ignored
+# fractions <- list(a = 0.2, b = 0.1)
+# sample <- sampleByKey(pairs, FALSE, fractions, 1618L) # KeyError: "c"
+#}
+# @rdname sampleByKey
+# @aliases sampleByKey,RDD-method
setMethod("sampleByKey",
signature(x = "RDD", withReplacement = "logical",
fractions = "vector", seed = "integer"),