aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorfelixcheung <felixcheung_m@hotmail.com>2015-11-12 20:02:49 -0800
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2015-11-12 20:02:49 -0800
commited04846e144db5bdab247c0e1fe2a47b99155c82 (patch)
tree5be48edeab8c70de790bf24d038843423602d804
parente71c07557c39e2f74bd20d2ab3a2fca88aa5dfbb (diff)
downloadspark-ed04846e144db5bdab247c0e1fe2a47b99155c82.tar.gz
spark-ed04846e144db5bdab247c0e1fe2a47b99155c82.tar.bz2
spark-ed04846e144db5bdab247c0e1fe2a47b99155c82.zip
[SPARK-11263][SPARKR] lintr Throws Warnings on Commented Code in Documentation
Clean out hundreds of `style: Commented code should be removed.` from lintr Like these: ``` /opt/spark-1.6.0-bin-hadoop2.6/R/pkg/R/DataFrame.R:513:3: style: Commented code should be removed. # sc <- sparkR.init() ^~~~~~~~~~~~~~~~~~~ /opt/spark-1.6.0-bin-hadoop2.6/R/pkg/R/DataFrame.R:514:3: style: Commented code should be removed. # sqlContext <- sparkRSQL.init(sc) ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ /opt/spark-1.6.0-bin-hadoop2.6/R/pkg/R/DataFrame.R:515:3: style: Commented code should be removed. # path <- "path/to/file.json" ^~~~~~~~~~~~~~~~~~~~~~~~~~~ ``` tried without export or rdname, neither work instead, added this `#' noRd` to suppress .Rd file generation also updated `family` for DataFrame functions for longer descriptive text instead of `dataframe_funcs` ![image](https://cloud.githubusercontent.com/assets/8969467/10933937/17bf5b1e-8291-11e5-9777-40fc632105dc.png) this covers *most* of 'Commented code' but I left out a few that looks legitimate. Author: felixcheung <felixcheung_m@hotmail.com> Closes #9463 from felixcheung/rlintr.
-rw-r--r--R/pkg/R/DataFrame.R232
-rw-r--r--R/pkg/R/RDD.R1585
-rw-r--r--R/pkg/R/SQLContext.R66
-rw-r--r--R/pkg/R/context.R235
-rw-r--r--R/pkg/R/generics.R18
-rw-r--r--R/pkg/R/pairRDD.R910
-rw-r--r--R/pkg/R/sparkR.R3
-rw-r--r--R/pkg/inst/profile/shell.R2
8 files changed, 1539 insertions, 1512 deletions
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index cc868069d1..fd105ba5bc 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -25,7 +25,7 @@ setOldClass("jobj")
#' @title S4 class that represents a DataFrame
#' @description DataFrames can be created using functions like \link{createDataFrame},
#' \link{jsonFile}, \link{table} etc.
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname DataFrame
#' @docType class
#'
@@ -68,7 +68,7 @@ dataFrame <- function(sdf, isCached = FALSE) {
#'
#' @param x A SparkSQL DataFrame
#'
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname printSchema
#' @name printSchema
#' @export
@@ -93,7 +93,7 @@ setMethod("printSchema",
#'
#' @param x A SparkSQL DataFrame
#'
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname schema
#' @name schema
#' @export
@@ -117,7 +117,7 @@ setMethod("schema",
#'
#' @param x A SparkSQL DataFrame
#' @param extended Logical. If extended is False, explain() only prints the physical plan.
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname explain
#' @name explain
#' @export
@@ -148,7 +148,7 @@ setMethod("explain",
#'
#' @param x A SparkSQL DataFrame
#'
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname isLocal
#' @name isLocal
#' @export
@@ -173,7 +173,7 @@ setMethod("isLocal",
#' @param x A SparkSQL DataFrame
#' @param numRows The number of rows to print. Defaults to 20.
#'
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname showDF
#' @name showDF
#' @export
@@ -198,7 +198,7 @@ setMethod("showDF",
#'
#' @param x A SparkSQL DataFrame
#'
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname show
#' @name show
#' @export
@@ -225,7 +225,7 @@ setMethod("show", "DataFrame",
#'
#' @param x A SparkSQL DataFrame
#'
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname dtypes
#' @name dtypes
#' @export
@@ -251,7 +251,7 @@ setMethod("dtypes",
#'
#' @param x A SparkSQL DataFrame
#'
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname columns
#' @name columns
#' @aliases names
@@ -272,7 +272,7 @@ setMethod("columns",
})
})
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname columns
#' @name names
setMethod("names",
@@ -281,7 +281,7 @@ setMethod("names",
columns(x)
})
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname columns
#' @name names<-
setMethod("names<-",
@@ -300,7 +300,7 @@ setMethod("names<-",
#' @param x A SparkSQL DataFrame
#' @param tableName A character vector containing the name of the table
#'
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname registerTempTable
#' @name registerTempTable
#' @export
@@ -328,7 +328,7 @@ setMethod("registerTempTable",
#' @param overwrite A logical argument indicating whether or not to overwrite
#' the existing rows in the table.
#'
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname insertInto
#' @name insertInto
#' @export
@@ -353,7 +353,7 @@ setMethod("insertInto",
#'
#' @param x A SparkSQL DataFrame
#'
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname cache
#' @name cache
#' @export
@@ -381,7 +381,7 @@ setMethod("cache",
#'
#' @param x The DataFrame to persist
#'
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname persist
#' @name persist
#' @export
@@ -409,7 +409,7 @@ setMethod("persist",
#' @param x The DataFrame to unpersist
#' @param blocking Whether to block until all blocks are deleted
#'
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname unpersist-methods
#' @name unpersist
#' @export
@@ -437,7 +437,7 @@ setMethod("unpersist",
#' @param x A SparkSQL DataFrame
#' @param numPartitions The number of partitions to use.
#'
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname repartition
#' @name repartition
#' @export
@@ -456,25 +456,24 @@ setMethod("repartition",
dataFrame(sdf)
})
-# toJSON
-#
-# Convert the rows of a DataFrame into JSON objects and return an RDD where
-# each element contains a JSON string.
-#
-# @param x A SparkSQL DataFrame
-# @return A StringRRDD of JSON objects
-#
-# @family dataframe_funcs
-# @rdname tojson
-# @export
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# sqlContext <- sparkRSQL.init(sc)
-# path <- "path/to/file.json"
-# df <- jsonFile(sqlContext, path)
-# newRDD <- toJSON(df)
-#}
+#' toJSON
+#'
+#' Convert the rows of a DataFrame into JSON objects and return an RDD where
+#' each element contains a JSON string.
+#'
+#' @param x A SparkSQL DataFrame
+#' @return A StringRRDD of JSON objects
+#' @family DataFrame functions
+#' @rdname tojson
+#' @noRd
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlContext <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlContext, path)
+#' newRDD <- toJSON(df)
+#'}
setMethod("toJSON",
signature(x = "DataFrame"),
function(x) {
@@ -491,7 +490,7 @@ setMethod("toJSON",
#' @param x A SparkSQL DataFrame
#' @param path The directory where the file is saved
#'
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname saveAsParquetFile
#' @name saveAsParquetFile
#' @export
@@ -515,7 +514,7 @@ setMethod("saveAsParquetFile",
#'
#' @param x A SparkSQL DataFrame
#'
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname distinct
#' @name distinct
#' @export
@@ -538,7 +537,7 @@ setMethod("distinct",
#
#' @description Returns a new DataFrame containing distinct rows in this DataFrame
#'
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname unique
#' @name unique
#' @aliases distinct
@@ -556,7 +555,7 @@ setMethod("unique",
#' @param withReplacement Sampling with replacement or not
#' @param fraction The (rough) sample target fraction
#'
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname sample
#' @aliases sample_frac
#' @export
@@ -580,7 +579,7 @@ setMethod("sample",
dataFrame(sdf)
})
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname sample
#' @name sample_frac
setMethod("sample_frac",
@@ -596,7 +595,7 @@ setMethod("sample_frac",
#'
#' @param x A SparkSQL DataFrame
#'
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname count
#' @name count
#' @aliases nrow
@@ -620,7 +619,7 @@ setMethod("count",
#'
#' @name nrow
#'
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname nrow
#' @aliases count
setMethod("nrow",
@@ -633,7 +632,7 @@ setMethod("nrow",
#'
#' @param x a SparkSQL DataFrame
#'
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname ncol
#' @name ncol
#' @export
@@ -654,7 +653,7 @@ setMethod("ncol",
#' Returns the dimentions (number of rows and columns) of a DataFrame
#' @param x a SparkSQL DataFrame
#'
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname dim
#' @name dim
#' @export
@@ -678,7 +677,7 @@ setMethod("dim",
#' @param stringsAsFactors (Optional) A logical indicating whether or not string columns
#' should be converted to factors. FALSE by default.
#'
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname collect
#' @name collect
#' @export
@@ -746,7 +745,7 @@ setMethod("collect",
#' @param num The number of rows to return
#' @return A new DataFrame containing the number of rows specified.
#'
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname limit
#' @name limit
#' @export
@@ -767,7 +766,7 @@ setMethod("limit",
#' Take the first NUM rows of a DataFrame and return a the results as a data.frame
#'
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname take
#' @name take
#' @export
@@ -796,7 +795,7 @@ setMethod("take",
#' @param num The number of rows to return. Default is 6.
#' @return A data.frame
#'
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname head
#' @name head
#' @export
@@ -819,7 +818,7 @@ setMethod("head",
#'
#' @param x A SparkSQL DataFrame
#'
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname first
#' @name first
#' @export
@@ -837,23 +836,21 @@ setMethod("first",
take(x, 1)
})
-# toRDD
-#
-# Converts a Spark DataFrame to an RDD while preserving column names.
-#
-# @param x A Spark DataFrame
-#
-# @family dataframe_funcs
-# @rdname DataFrame
-# @export
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# sqlContext <- sparkRSQL.init(sc)
-# path <- "path/to/file.json"
-# df <- jsonFile(sqlContext, path)
-# rdd <- toRDD(df)
-# }
+#' toRDD
+#'
+#' Converts a Spark DataFrame to an RDD while preserving column names.
+#'
+#' @param x A Spark DataFrame
+#'
+#' @noRd
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlContext <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlContext, path)
+#' rdd <- toRDD(df)
+#'}
setMethod("toRDD",
signature(x = "DataFrame"),
function(x) {
@@ -874,7 +871,7 @@ setMethod("toRDD",
#' @return a GroupedData
#' @seealso GroupedData
#' @aliases group_by
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname groupBy
#' @name groupBy
#' @export
@@ -899,7 +896,7 @@ setMethod("groupBy",
groupedData(sgd)
})
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname groupBy
#' @name group_by
setMethod("group_by",
@@ -913,7 +910,7 @@ setMethod("group_by",
#' Compute aggregates by specifying a list of columns
#'
#' @param x a DataFrame
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname agg
#' @name agg
#' @aliases summarize
@@ -924,7 +921,7 @@ setMethod("agg",
agg(groupBy(x), ...)
})
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname agg
#' @name summarize
setMethod("summarize",
@@ -940,8 +937,8 @@ setMethod("summarize",
# the requested map function. #
###################################################################################
-# @family dataframe_funcs
-# @rdname lapply
+#' @rdname lapply
+#' @noRd
setMethod("lapply",
signature(X = "DataFrame", FUN = "function"),
function(X, FUN) {
@@ -949,24 +946,25 @@ setMethod("lapply",
lapply(rdd, FUN)
})
-# @family dataframe_funcs
-# @rdname lapply
+#' @rdname lapply
+#' @noRd
setMethod("map",
signature(X = "DataFrame", FUN = "function"),
function(X, FUN) {
lapply(X, FUN)
})
-# @family dataframe_funcs
-# @rdname flatMap
+#' @rdname flatMap
+#' @noRd
setMethod("flatMap",
signature(X = "DataFrame", FUN = "function"),
function(X, FUN) {
rdd <- toRDD(X)
flatMap(rdd, FUN)
})
-# @family dataframe_funcs
-# @rdname lapplyPartition
+
+#' @rdname lapplyPartition
+#' @noRd
setMethod("lapplyPartition",
signature(X = "DataFrame", FUN = "function"),
function(X, FUN) {
@@ -974,16 +972,16 @@ setMethod("lapplyPartition",
lapplyPartition(rdd, FUN)
})
-# @family dataframe_funcs
-# @rdname lapplyPartition
+#' @rdname lapplyPartition
+#' @noRd
setMethod("mapPartitions",
signature(X = "DataFrame", FUN = "function"),
function(X, FUN) {
lapplyPartition(X, FUN)
})
-# @family dataframe_funcs
-# @rdname foreach
+#' @rdname foreach
+#' @noRd
setMethod("foreach",
signature(x = "DataFrame", func = "function"),
function(x, func) {
@@ -991,8 +989,8 @@ setMethod("foreach",
foreach(rdd, func)
})
-# @family dataframe_funcs
-# @rdname foreach
+#' @rdname foreach
+#' @noRd
setMethod("foreachPartition",
signature(x = "DataFrame", func = "function"),
function(x, func) {
@@ -1091,7 +1089,7 @@ setMethod("[", signature(x = "DataFrame", i = "Column"),
#' @param select expression for the single Column or a list of columns to select from the DataFrame
#' @return A new DataFrame containing only the rows that meet the condition with selected columns
#' @export
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname subset
#' @name subset
#' @aliases [
@@ -1122,7 +1120,7 @@ setMethod("subset", signature(x = "DataFrame"),
#' @param col A list of columns or single Column or name
#' @return A new DataFrame with selected columns
#' @export
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname select
#' @name select
#' @family subsetting functions
@@ -1150,7 +1148,7 @@ setMethod("select", signature(x = "DataFrame", col = "character"),
}
})
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname select
#' @export
setMethod("select", signature(x = "DataFrame", col = "Column"),
@@ -1162,7 +1160,7 @@ setMethod("select", signature(x = "DataFrame", col = "Column"),
dataFrame(sdf)
})
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname select
#' @export
setMethod("select",
@@ -1187,7 +1185,7 @@ setMethod("select",
#' @param expr A string containing a SQL expression
#' @param ... Additional expressions
#' @return A DataFrame
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname selectExpr
#' @name selectExpr
#' @export
@@ -1215,7 +1213,7 @@ setMethod("selectExpr",
#' @param colName A string containing the name of the new column.
#' @param col A Column expression.
#' @return A DataFrame with the new column added.
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname withColumn
#' @name withColumn
#' @aliases mutate transform
@@ -1241,7 +1239,7 @@ setMethod("withColumn",
#' @param .data A DataFrame
#' @param col a named argument of the form name = col
#' @return A new DataFrame with the new columns added.
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname withColumn
#' @name mutate
#' @aliases withColumn transform
@@ -1275,7 +1273,7 @@ setMethod("mutate",
})
#' @export
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname withColumn
#' @name transform
#' @aliases withColumn mutate
@@ -1293,7 +1291,7 @@ setMethod("transform",
#' @param existingCol The name of the column you want to change.
#' @param newCol The new column name.
#' @return A DataFrame with the column name changed.
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname withColumnRenamed
#' @name withColumnRenamed
#' @export
@@ -1325,7 +1323,7 @@ setMethod("withColumnRenamed",
#' @param x A DataFrame
#' @param newCol A named pair of the form new_column_name = existing_column
#' @return A DataFrame with the column name changed.
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname withColumnRenamed
#' @name rename
#' @aliases withColumnRenamed
@@ -1370,7 +1368,7 @@ setClassUnion("characterOrColumn", c("character", "Column"))
#' @param decreasing A logical argument indicating sorting order for columns when
#' a character vector is specified for col
#' @return A DataFrame where all elements are sorted.
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname arrange
#' @name arrange
#' @aliases orderby
@@ -1397,7 +1395,7 @@ setMethod("arrange",
dataFrame(sdf)
})
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname arrange
#' @export
setMethod("arrange",
@@ -1429,7 +1427,7 @@ setMethod("arrange",
do.call("arrange", c(x, jcols))
})
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname arrange
#' @name orderby
setMethod("orderBy",
@@ -1446,7 +1444,7 @@ setMethod("orderBy",
#' @param condition The condition to filter on. This may either be a Column expression
#' or a string containing a SQL statement
#' @return A DataFrame containing only the rows that meet the condition.
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname filter
#' @name filter
#' @family subsetting functions
@@ -1470,7 +1468,7 @@ setMethod("filter",
dataFrame(sdf)
})
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname filter
#' @name where
setMethod("where",
@@ -1491,7 +1489,7 @@ setMethod("where",
#' 'inner', 'outer', 'full', 'fullouter', leftouter', 'left_outer', 'left',
#' 'right_outer', 'rightouter', 'right', and 'leftsemi'. The default joinType is "inner".
#' @return A DataFrame containing the result of the join operation.
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname join
#' @name join
#' @export
@@ -1550,7 +1548,7 @@ setMethod("join",
#' be returned. If all.x is set to FALSE and all.y is set to TRUE, a right
#' outer join will be returned. If all.x and all.y are set to TRUE, a full
#' outer join will be returned.
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname merge
#' @export
#' @examples
@@ -1682,7 +1680,7 @@ generateAliasesForIntersectedCols <- function (x, intersectedColNames, suffix) {
#' @param x A Spark DataFrame
#' @param y A Spark DataFrame
#' @return A DataFrame containing the result of the union.
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname unionAll
#' @name unionAll
#' @export
@@ -1705,7 +1703,7 @@ setMethod("unionAll",
#'
#' @description Returns a new DataFrame containing rows of all parameters.
#'
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname rbind
#' @name rbind
#' @aliases unionAll
@@ -1727,7 +1725,7 @@ setMethod("rbind",
#' @param x A Spark DataFrame
#' @param y A Spark DataFrame
#' @return A DataFrame containing the result of the intersect.
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname intersect
#' @name intersect
#' @export
@@ -1754,7 +1752,7 @@ setMethod("intersect",
#' @param x A Spark DataFrame
#' @param y A Spark DataFrame
#' @return A DataFrame containing the result of the except operation.
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname except
#' @name except
#' @export
@@ -1794,7 +1792,7 @@ setMethod("except",
#' @param source A name for external data source
#' @param mode One of 'append', 'overwrite', 'error', 'ignore' save mode
#'
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname write.df
#' @name write.df
#' @aliases saveDF
@@ -1830,7 +1828,7 @@ setMethod("write.df",
callJMethod(df@sdf, "save", source, jmode, options)
})
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname write.df
#' @name saveDF
#' @export
@@ -1861,7 +1859,7 @@ setMethod("saveDF",
#' @param source A name for external data source
#' @param mode One of 'append', 'overwrite', 'error', 'ignore' save mode
#'
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname saveAsTable
#' @name saveAsTable
#' @export
@@ -1902,7 +1900,7 @@ setMethod("saveAsTable",
#' @param col A string of name
#' @param ... Additional expressions
#' @return A DataFrame
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname describe
#' @name describe
#' @aliases summary
@@ -1925,7 +1923,7 @@ setMethod("describe",
dataFrame(sdf)
})
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname describe
#' @name describe
setMethod("describe",
@@ -1940,7 +1938,7 @@ setMethod("describe",
#'
#' @description Computes statistics for numeric columns of the DataFrame
#'
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname summary
#' @name summary
setMethod("summary",
@@ -1965,7 +1963,7 @@ setMethod("summary",
#' @param cols Optional list of column names to consider.
#' @return A DataFrame
#'
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname nafunctions
#' @name dropna
#' @aliases na.omit
@@ -1995,7 +1993,7 @@ setMethod("dropna",
dataFrame(sdf)
})
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname nafunctions
#' @name na.omit
#' @export
@@ -2023,7 +2021,7 @@ setMethod("na.omit",
#' column is simply ignored.
#' @return A DataFrame
#'
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname nafunctions
#' @name fillna
#' @export
@@ -2087,7 +2085,7 @@ setMethod("fillna",
#' @title Download data from a DataFrame into a data.frame
#' @param x a DataFrame
#' @return a data.frame
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname as.data.frame
#' @examples \dontrun{
#'
@@ -2108,7 +2106,7 @@ setMethod("as.data.frame",
#' the DataFrame is searched by R when evaluating a variable, so columns in
#' the DataFrame can be accessed by simply giving their names.
#'
-#' @family dataframe_funcs
+#' @family DataFrame functions
#' @rdname attach
#' @title Attach DataFrame to R search path
#' @param what (DataFrame) The DataFrame to attach
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R
index 051e441d4e..47945c2825 100644
--- a/R/pkg/R/RDD.R
+++ b/R/pkg/R/RDD.R
@@ -19,16 +19,15 @@
setOldClass("jobj")
-# @title S4 class that represents an RDD
-# @description RDD can be created using functions like
-# \code{parallelize}, \code{textFile} etc.
-# @rdname RDD
-# @seealso parallelize, textFile
-#
-# @slot env An R environment that stores bookkeeping states of the RDD
-# @slot jrdd Java object reference to the backing JavaRDD
-# to an RDD
-# @export
+#' @title S4 class that represents an RDD
+#' @description RDD can be created using functions like
+#' \code{parallelize}, \code{textFile} etc.
+#' @rdname RDD
+#' @seealso parallelize, textFile
+#' @slot env An R environment that stores bookkeeping states of the RDD
+#' @slot jrdd Java object reference to the backing JavaRDD
+#' to an RDD
+#' @noRd
setClass("RDD",
slots = list(env = "environment",
jrdd = "jobj"))
@@ -111,14 +110,13 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
.Object
})
-# @rdname RDD
-# @export
-#
-# @param jrdd Java object reference to the backing JavaRDD
-# @param serializedMode Use "byte" if the RDD stores data serialized in R, "string" if the RDD
-# stores strings, and "row" if the RDD stores the rows of a DataFrame
-# @param isCached TRUE if the RDD is cached
-# @param isCheckpointed TRUE if the RDD has been checkpointed
+#' @rdname RDD
+#' @noRd
+#' @param jrdd Java object reference to the backing JavaRDD
+#' @param serializedMode Use "byte" if the RDD stores data serialized in R, "string" if the RDD
+#' stores strings, and "row" if the RDD stores the rows of a DataFrame
+#' @param isCached TRUE if the RDD is cached
+#' @param isCheckpointed TRUE if the RDD has been checkpointed
RDD <- function(jrdd, serializedMode = "byte", isCached = FALSE,
isCheckpointed = FALSE) {
new("RDD", jrdd, serializedMode, isCached, isCheckpointed)
@@ -201,19 +199,20 @@ setValidity("RDD",
############ Actions and Transformations ############
-# Persist an RDD
-#
-# Persist this RDD with the default storage level (MEMORY_ONLY).
-#
-# @param x The RDD to cache
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, 1:10, 2L)
-# cache(rdd)
-#}
-# @rdname cache-methods
-# @aliases cache,RDD-method
+#' Persist an RDD
+#'
+#' Persist this RDD with the default storage level (MEMORY_ONLY).
+#'
+#' @param x The RDD to cache
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10, 2L)
+#' cache(rdd)
+#'}
+#' @rdname cache-methods
+#' @aliases cache,RDD-method
+#' @noRd
setMethod("cache",
signature(x = "RDD"),
function(x) {
@@ -222,22 +221,23 @@ setMethod("cache",
x
})
-# Persist an RDD
-#
-# Persist this RDD with the specified storage level. For details of the
-# supported storage levels, refer to
-# http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence.
-#
-# @param x The RDD to persist
-# @param newLevel The new storage level to be assigned
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, 1:10, 2L)
-# persist(rdd, "MEMORY_AND_DISK")
-#}
-# @rdname persist
-# @aliases persist,RDD-method
+#' Persist an RDD
+#'
+#' Persist this RDD with the specified storage level. For details of the
+#' supported storage levels, refer to
+#' http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence.
+#'
+#' @param x The RDD to persist
+#' @param newLevel The new storage level to be assigned
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10, 2L)
+#' persist(rdd, "MEMORY_AND_DISK")
+#'}
+#' @rdname persist
+#' @aliases persist,RDD-method
+#' @noRd
setMethod("persist",
signature(x = "RDD", newLevel = "character"),
function(x, newLevel = "MEMORY_ONLY") {
@@ -246,21 +246,22 @@ setMethod("persist",
x
})
-# Unpersist an RDD
-#
-# Mark the RDD as non-persistent, and remove all blocks for it from memory and
-# disk.
-#
-# @param x The RDD to unpersist
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, 1:10, 2L)
-# cache(rdd) # rdd@@env$isCached == TRUE
-# unpersist(rdd) # rdd@@env$isCached == FALSE
-#}
-# @rdname unpersist-methods
-# @aliases unpersist,RDD-method
+#' Unpersist an RDD
+#'
+#' Mark the RDD as non-persistent, and remove all blocks for it from memory and
+#' disk.
+#'
+#' @param x The RDD to unpersist
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10, 2L)
+#' cache(rdd) # rdd@@env$isCached == TRUE
+#' unpersist(rdd) # rdd@@env$isCached == FALSE
+#'}
+#' @rdname unpersist-methods
+#' @aliases unpersist,RDD-method
+#' @noRd
setMethod("unpersist",
signature(x = "RDD"),
function(x) {
@@ -269,24 +270,25 @@ setMethod("unpersist",
x
})
-# Checkpoint an RDD
-#
-# Mark this RDD for checkpointing. It will be saved to a file inside the
-# checkpoint directory set with setCheckpointDir() and all references to its
-# parent RDDs will be removed. This function must be called before any job has
-# been executed on this RDD. It is strongly recommended that this RDD is
-# persisted in memory, otherwise saving it on a file will require recomputation.
-#
-# @param x The RDD to checkpoint
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# setCheckpointDir(sc, "checkpoint")
-# rdd <- parallelize(sc, 1:10, 2L)
-# checkpoint(rdd)
-#}
-# @rdname checkpoint-methods
-# @aliases checkpoint,RDD-method
+#' Checkpoint an RDD
+#'
+#' Mark this RDD for checkpointing. It will be saved to a file inside the
+#' checkpoint directory set with setCheckpointDir() and all references to its
+#' parent RDDs will be removed. This function must be called before any job has
+#' been executed on this RDD. It is strongly recommended that this RDD is
+#' persisted in memory, otherwise saving it on a file will require recomputation.
+#'
+#' @param x The RDD to checkpoint
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' setCheckpointDir(sc, "checkpoint")
+#' rdd <- parallelize(sc, 1:10, 2L)
+#' checkpoint(rdd)
+#'}
+#' @rdname checkpoint-methods
+#' @aliases checkpoint,RDD-method
+#' @noRd
setMethod("checkpoint",
signature(x = "RDD"),
function(x) {
@@ -296,18 +298,19 @@ setMethod("checkpoint",
x
})
-# Gets the number of partitions of an RDD
-#
-# @param x A RDD.
-# @return the number of partitions of rdd as an integer.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, 1:10, 2L)
-# numPartitions(rdd) # 2L
-#}
-# @rdname numPartitions
-# @aliases numPartitions,RDD-method
+#' Gets the number of partitions of an RDD
+#'
+#' @param x A RDD.
+#' @return the number of partitions of rdd as an integer.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10, 2L)
+#' numPartitions(rdd) # 2L
+#'}
+#' @rdname numPartitions
+#' @aliases numPartitions,RDD-method
+#' @noRd
setMethod("numPartitions",
signature(x = "RDD"),
function(x) {
@@ -316,24 +319,25 @@ setMethod("numPartitions",
callJMethod(partitions, "size")
})
-# Collect elements of an RDD
-#
-# @description
-# \code{collect} returns a list that contains all of the elements in this RDD.
-#
-# @param x The RDD to collect
-# @param ... Other optional arguments to collect
-# @param flatten FALSE if the list should not flattened
-# @return a list containing elements in the RDD
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, 1:10, 2L)
-# collect(rdd) # list from 1 to 10
-# collectPartition(rdd, 0L) # list from 1 to 5
-#}
-# @rdname collect-methods
-# @aliases collect,RDD-method
+#' Collect elements of an RDD
+#'
+#' @description
+#' \code{collect} returns a list that contains all of the elements in this RDD.
+#'
+#' @param x The RDD to collect
+#' @param ... Other optional arguments to collect
+#' @param flatten FALSE if the list should not flattened
+#' @return a list containing elements in the RDD
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10, 2L)
+#' collect(rdd) # list from 1 to 10
+#' collectPartition(rdd, 0L) # list from 1 to 5
+#'}
+#' @rdname collect-methods
+#' @aliases collect,RDD-method
+#' @noRd
setMethod("collect",
signature(x = "RDD"),
function(x, flatten = TRUE) {
@@ -344,12 +348,13 @@ setMethod("collect",
})
-# @description
-# \code{collectPartition} returns a list that contains all of the elements
-# in the specified partition of the RDD.
-# @param partitionId the partition to collect (starts from 0)
-# @rdname collect-methods
-# @aliases collectPartition,integer,RDD-method
+#' @description
+#' \code{collectPartition} returns a list that contains all of the elements
+#' in the specified partition of the RDD.
+#' @param partitionId the partition to collect (starts from 0)
+#' @rdname collect-methods
+#' @aliases collectPartition,integer,RDD-method
+#' @noRd
setMethod("collectPartition",
signature(x = "RDD", partitionId = "integer"),
function(x, partitionId) {
@@ -362,17 +367,18 @@ setMethod("collectPartition",
serializedMode = getSerializedMode(x))
})
-# @description
-# \code{collectAsMap} returns a named list as a map that contains all of the elements
-# in a key-value pair RDD.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, list(list(1, 2), list(3, 4)), 2L)
-# collectAsMap(rdd) # list(`1` = 2, `3` = 4)
-#}
-# @rdname collect-methods
-# @aliases collectAsMap,RDD-method
+#' @description
+#' \code{collectAsMap} returns a named list as a map that contains all of the elements
+#' in a key-value pair RDD.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(list(1, 2), list(3, 4)), 2L)
+#' collectAsMap(rdd) # list(`1` = 2, `3` = 4)
+#'}
+#' @rdname collect-methods
+#' @aliases collectAsMap,RDD-method
+#' @noRd
setMethod("collectAsMap",
signature(x = "RDD"),
function(x) {
@@ -382,19 +388,20 @@ setMethod("collectAsMap",
as.list(map)
})
-# Return the number of elements in the RDD.
-#
-# @param x The RDD to count
-# @return number of elements in the RDD.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, 1:10)
-# count(rdd) # 10
-# length(rdd) # Same as count
-#}
-# @rdname count
-# @aliases count,RDD-method
+#' Return the number of elements in the RDD.
+#'
+#' @param x The RDD to count
+#' @return number of elements in the RDD.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' count(rdd) # 10
+#' length(rdd) # Same as count
+#'}
+#' @rdname count
+#' @aliases count,RDD-method
+#' @noRd
setMethod("count",
signature(x = "RDD"),
function(x) {
@@ -406,31 +413,32 @@ setMethod("count",
sum(as.integer(vals))
})
-# Return the number of elements in the RDD
-# @export
-# @rdname count
+#' Return the number of elements in the RDD
+#' @rdname count
+#' @noRd
setMethod("length",
signature(x = "RDD"),
function(x) {
count(x)
})
-# Return the count of each unique value in this RDD as a list of
-# (value, count) pairs.
-#
-# Same as countByValue in Spark.
-#
-# @param x The RDD to count
-# @return list of (value, count) pairs, where count is number of each unique
-# value in rdd.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, c(1,2,3,2,1))
-# countByValue(rdd) # (1,2L), (2,2L), (3,1L)
-#}
-# @rdname countByValue
-# @aliases countByValue,RDD-method
+#' Return the count of each unique value in this RDD as a list of
+#' (value, count) pairs.
+#'
+#' Same as countByValue in Spark.
+#'
+#' @param x The RDD to count
+#' @return list of (value, count) pairs, where count is number of each unique
+#' value in rdd.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, c(1,2,3,2,1))
+#' countByValue(rdd) # (1,2L), (2,2L), (3,1L)
+#'}
+#' @rdname countByValue
+#' @aliases countByValue,RDD-method
+#' @noRd
setMethod("countByValue",
signature(x = "RDD"),
function(x) {
@@ -438,23 +446,24 @@ setMethod("countByValue",
collect(reduceByKey(ones, `+`, numPartitions(x)))
})
-# Apply a function to all elements
-#
-# This function creates a new RDD by applying the given transformation to all
-# elements of the given RDD
-#
-# @param X The RDD to apply the transformation.
-# @param FUN the transformation to apply on each element
-# @return a new RDD created by the transformation.
-# @rdname lapply
-# @aliases lapply
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, 1:10)
-# multiplyByTwo <- lapply(rdd, function(x) { x * 2 })
-# collect(multiplyByTwo) # 2,4,6...
-#}
+#' Apply a function to all elements
+#'
+#' This function creates a new RDD by applying the given transformation to all
+#' elements of the given RDD
+#'
+#' @param X The RDD to apply the transformation.
+#' @param FUN the transformation to apply on each element
+#' @return a new RDD created by the transformation.
+#' @rdname lapply
+#' @noRd
+#' @aliases lapply
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' multiplyByTwo <- lapply(rdd, function(x) { x * 2 })
+#' collect(multiplyByTwo) # 2,4,6...
+#'}
setMethod("lapply",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
@@ -464,31 +473,33 @@ setMethod("lapply",
lapplyPartitionsWithIndex(X, func)
})
-# @rdname lapply
-# @aliases map,RDD,function-method
+#' @rdname lapply
+#' @aliases map,RDD,function-method
+#' @noRd
setMethod("map",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
lapply(X, FUN)
})
-# Flatten results after apply a function to all elements
-#
-# This function return a new RDD by first applying a function to all
-# elements of this RDD, and then flattening the results.
-#
-# @param X The RDD to apply the transformation.
-# @param FUN the transformation to apply on each element
-# @return a new RDD created by the transformation.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, 1:10)
-# multiplyByTwo <- flatMap(rdd, function(x) { list(x*2, x*10) })
-# collect(multiplyByTwo) # 2,20,4,40,6,60...
-#}
-# @rdname flatMap
-# @aliases flatMap,RDD,function-method
+#' Flatten results after apply a function to all elements
+#'
+#' This function return a new RDD by first applying a function to all
+#' elements of this RDD, and then flattening the results.
+#'
+#' @param X The RDD to apply the transformation.
+#' @param FUN the transformation to apply on each element
+#' @return a new RDD created by the transformation.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' multiplyByTwo <- flatMap(rdd, function(x) { list(x*2, x*10) })
+#' collect(multiplyByTwo) # 2,20,4,40,6,60...
+#'}
+#' @rdname flatMap
+#' @aliases flatMap,RDD,function-method
+#' @noRd
setMethod("flatMap",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
@@ -501,83 +512,88 @@ setMethod("flatMap",
lapplyPartition(X, partitionFunc)
})
-# Apply a function to each partition of an RDD
-#
-# Return a new RDD by applying a function to each partition of this RDD.
-#
-# @param X The RDD to apply the transformation.
-# @param FUN the transformation to apply on each partition.
-# @return a new RDD created by the transformation.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, 1:10)
-# partitionSum <- lapplyPartition(rdd, function(part) { Reduce("+", part) })
-# collect(partitionSum) # 15, 40
-#}
-# @rdname lapplyPartition
-# @aliases lapplyPartition,RDD,function-method
+#' Apply a function to each partition of an RDD
+#'
+#' Return a new RDD by applying a function to each partition of this RDD.
+#'
+#' @param X The RDD to apply the transformation.
+#' @param FUN the transformation to apply on each partition.
+#' @return a new RDD created by the transformation.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' partitionSum <- lapplyPartition(rdd, function(part) { Reduce("+", part) })
+#' collect(partitionSum) # 15, 40
+#'}
+#' @rdname lapplyPartition
+#' @aliases lapplyPartition,RDD,function-method
+#' @noRd
setMethod("lapplyPartition",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
lapplyPartitionsWithIndex(X, function(s, part) { FUN(part) })
})
-# mapPartitions is the same as lapplyPartition.
-#
-# @rdname lapplyPartition
-# @aliases mapPartitions,RDD,function-method
+#' mapPartitions is the same as lapplyPartition.
+#'
+#' @rdname lapplyPartition
+#' @aliases mapPartitions,RDD,function-method
+#' @noRd
setMethod("mapPartitions",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
lapplyPartition(X, FUN)
})
-# Return a new RDD by applying a function to each partition of this RDD, while
-# tracking the index of the original partition.
-#
-# @param X The RDD to apply the transformation.
-# @param FUN the transformation to apply on each partition; takes the partition
-# index and a list of elements in the particular partition.
-# @return a new RDD created by the transformation.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, 1:10, 5L)
-# prod <- lapplyPartitionsWithIndex(rdd, function(partIndex, part) {
-# partIndex * Reduce("+", part) })
-# collect(prod, flatten = FALSE) # 0, 7, 22, 45, 76
-#}
-# @rdname lapplyPartitionsWithIndex
-# @aliases lapplyPartitionsWithIndex,RDD,function-method
+#' Return a new RDD by applying a function to each partition of this RDD, while
+#' tracking the index of the original partition.
+#'
+#' @param X The RDD to apply the transformation.
+#' @param FUN the transformation to apply on each partition; takes the partition
+#' index and a list of elements in the particular partition.
+#' @return a new RDD created by the transformation.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10, 5L)
+#' prod <- lapplyPartitionsWithIndex(rdd, function(partIndex, part) {
+#' partIndex * Reduce("+", part) })
+#' collect(prod, flatten = FALSE) # 0, 7, 22, 45, 76
+#'}
+#' @rdname lapplyPartitionsWithIndex
+#' @aliases lapplyPartitionsWithIndex,RDD,function-method
+#' @noRd
setMethod("lapplyPartitionsWithIndex",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
PipelinedRDD(X, FUN)
})
-# @rdname lapplyPartitionsWithIndex
-# @aliases mapPartitionsWithIndex,RDD,function-method
+#' @rdname lapplyPartitionsWithIndex
+#' @aliases mapPartitionsWithIndex,RDD,function-method
+#' @noRd
setMethod("mapPartitionsWithIndex",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
lapplyPartitionsWithIndex(X, FUN)
})
-# This function returns a new RDD containing only the elements that satisfy
-# a predicate (i.e. returning TRUE in a given logical function).
-# The same as `filter()' in Spark.
-#
-# @param x The RDD to be filtered.
-# @param f A unary predicate function.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, 1:10)
-# unlist(collect(filterRDD(rdd, function (x) { x < 3 }))) # c(1, 2)
-#}
-# @rdname filterRDD
-# @aliases filterRDD,RDD,function-method
+#' This function returns a new RDD containing only the elements that satisfy
+#' a predicate (i.e. returning TRUE in a given logical function).
+#' The same as `filter()' in Spark.
+#'
+#' @param x The RDD to be filtered.
+#' @param f A unary predicate function.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' unlist(collect(filterRDD(rdd, function (x) { x < 3 }))) # c(1, 2)
+#'}
+#' @rdname filterRDD
+#' @aliases filterRDD,RDD,function-method
+#' @noRd
setMethod("filterRDD",
signature(x = "RDD", f = "function"),
function(x, f) {
@@ -587,30 +603,32 @@ setMethod("filterRDD",
lapplyPartition(x, filter.func)
})
-# @rdname filterRDD
-# @aliases Filter
+#' @rdname filterRDD
+#' @aliases Filter
+#' @noRd
setMethod("Filter",
signature(f = "function", x = "RDD"),
function(f, x) {
filterRDD(x, f)
})
-# Reduce across elements of an RDD.
-#
-# This function reduces the elements of this RDD using the
-# specified commutative and associative binary operator.
-#
-# @param x The RDD to reduce
-# @param func Commutative and associative function to apply on elements
-# of the RDD.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, 1:10)
-# reduce(rdd, "+") # 55
-#}
-# @rdname reduce
-# @aliases reduce,RDD,ANY-method
+#' Reduce across elements of an RDD.
+#'
+#' This function reduces the elements of this RDD using the
+#' specified commutative and associative binary operator.
+#'
+#' @param x The RDD to reduce
+#' @param func Commutative and associative function to apply on elements
+#' of the RDD.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' reduce(rdd, "+") # 55
+#'}
+#' @rdname reduce
+#' @aliases reduce,RDD,ANY-method
+#' @noRd
setMethod("reduce",
signature(x = "RDD", func = "ANY"),
function(x, func) {
@@ -624,70 +642,74 @@ setMethod("reduce",
Reduce(func, partitionList)
})
-# Get the maximum element of an RDD.
-#
-# @param x The RDD to get the maximum element from
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, 1:10)
-# maximum(rdd) # 10
-#}
-# @rdname maximum
-# @aliases maximum,RDD
+#' Get the maximum element of an RDD.
+#'
+#' @param x The RDD to get the maximum element from
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' maximum(rdd) # 10
+#'}
+#' @rdname maximum
+#' @aliases maximum,RDD
+#' @noRd
setMethod("maximum",
signature(x = "RDD"),
function(x) {
reduce(x, max)
})
-# Get the minimum element of an RDD.
-#
-# @param x The RDD to get the minimum element from
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, 1:10)
-# minimum(rdd) # 1
-#}
-# @rdname minimum
-# @aliases minimum,RDD
+#' Get the minimum element of an RDD.
+#'
+#' @param x The RDD to get the minimum element from
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' minimum(rdd) # 1
+#'}
+#' @rdname minimum
+#' @aliases minimum,RDD
+#' @noRd
setMethod("minimum",
signature(x = "RDD"),
function(x) {
reduce(x, min)
})
-# Add up the elements in an RDD.
-#
-# @param x The RDD to add up the elements in
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, 1:10)
-# sumRDD(rdd) # 55
-#}
-# @rdname sumRDD
-# @aliases sumRDD,RDD
+#' Add up the elements in an RDD.
+#'
+#' @param x The RDD to add up the elements in
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' sumRDD(rdd) # 55
+#'}
+#' @rdname sumRDD
+#' @aliases sumRDD,RDD
+#' @noRd
setMethod("sumRDD",
signature(x = "RDD"),
function(x) {
reduce(x, "+")
})
-# Applies a function to all elements in an RDD, and force evaluation.
-#
-# @param x The RDD to apply the function
-# @param func The function to be applied.
-# @return invisible NULL.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, 1:10)
-# foreach(rdd, function(x) { save(x, file=...) })
-#}
-# @rdname foreach
-# @aliases foreach,RDD,function-method
+#' Applies a function to all elements in an RDD, and force evaluation.
+#'
+#' @param x The RDD to apply the function
+#' @param func The function to be applied.
+#' @return invisible NULL.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' foreach(rdd, function(x) { save(x, file=...) })
+#'}
+#' @rdname foreach
+#' @aliases foreach,RDD,function-method
+#' @noRd
setMethod("foreach",
signature(x = "RDD", func = "function"),
function(x, func) {
@@ -698,37 +720,39 @@ setMethod("foreach",
invisible(collect(mapPartitions(x, partition.func)))
})
-# Applies a function to each partition in an RDD, and force evaluation.
-#
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, 1:10)
-# foreachPartition(rdd, function(part) { save(part, file=...); NULL })
-#}
-# @rdname foreach
-# @aliases foreachPartition,RDD,function-method
+#' Applies a function to each partition in an RDD, and force evaluation.
+#'
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' foreachPartition(rdd, function(part) { save(part, file=...); NULL })
+#'}
+#' @rdname foreach
+#' @aliases foreachPartition,RDD,function-method
+#' @noRd
setMethod("foreachPartition",
signature(x = "RDD", func = "function"),
function(x, func) {
invisible(collect(mapPartitions(x, func)))
})
-# Take elements from an RDD.
-#
-# This function takes the first NUM elements in the RDD and
-# returns them in a list.
-#
-# @param x The RDD to take elements from
-# @param num Number of elements to take
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, 1:10)
-# take(rdd, 2L) # list(1, 2)
-#}
-# @rdname take
-# @aliases take,RDD,numeric-method
+#' Take elements from an RDD.
+#'
+#' This function takes the first NUM elements in the RDD and
+#' returns them in a list.
+#'
+#' @param x The RDD to take elements from
+#' @param num Number of elements to take
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' take(rdd, 2L) # list(1, 2)
+#'}
+#' @rdname take
+#' @aliases take,RDD,numeric-method
+#' @noRd
setMethod("take",
signature(x = "RDD", num = "numeric"),
function(x, num) {
@@ -763,39 +787,40 @@ setMethod("take",
})
-# First
-#
-# Return the first element of an RDD
-#
-# @rdname first
-# @export
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, 1:10)
-# first(rdd)
-# }
+#' First
+#'
+#' Return the first element of an RDD
+#'
+#' @rdname first
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' first(rdd)
+#' }
+#' @noRd
setMethod("first",
signature(x = "RDD"),
function(x) {
take(x, 1)[[1]]
})
-# Removes the duplicates from RDD.
-#
-# This function returns a new RDD containing the distinct elements in the
-# given RDD. The same as `distinct()' in Spark.
-#
-# @param x The RDD to remove duplicates from.
-# @param numPartitions Number of partitions to create.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, c(1,2,2,3,3,3))
-# sort(unlist(collect(distinct(rdd)))) # c(1, 2, 3)
-#}
-# @rdname distinct
-# @aliases distinct,RDD-method
+#' Removes the duplicates from RDD.
+#'
+#' This function returns a new RDD containing the distinct elements in the
+#' given RDD. The same as `distinct()' in Spark.
+#'
+#' @param x The RDD to remove duplicates from.
+#' @param numPartitions Number of partitions to create.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, c(1,2,2,3,3,3))
+#' sort(unlist(collect(distinct(rdd)))) # c(1, 2, 3)
+#'}
+#' @rdname distinct
+#' @aliases distinct,RDD-method
+#' @noRd
setMethod("distinct",
signature(x = "RDD"),
function(x, numPartitions = SparkR:::numPartitions(x)) {
@@ -807,24 +832,25 @@ setMethod("distinct",
resRDD
})
-# Return an RDD that is a sampled subset of the given RDD.
-#
-# The same as `sample()' in Spark. (We rename it due to signature
-# inconsistencies with the `sample()' function in R's base package.)
-#
-# @param x The RDD to sample elements from
-# @param withReplacement Sampling with replacement or not
-# @param fraction The (rough) sample target fraction
-# @param seed Randomness seed value
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, 1:10)
-# collect(sampleRDD(rdd, FALSE, 0.5, 1618L)) # ~5 distinct elements
-# collect(sampleRDD(rdd, TRUE, 0.5, 9L)) # ~5 elements possibly with duplicates
-#}
-# @rdname sampleRDD
-# @aliases sampleRDD,RDD
+#' Return an RDD that is a sampled subset of the given RDD.
+#'
+#' The same as `sample()' in Spark. (We rename it due to signature
+#' inconsistencies with the `sample()' function in R's base package.)
+#'
+#' @param x The RDD to sample elements from
+#' @param withReplacement Sampling with replacement or not
+#' @param fraction The (rough) sample target fraction
+#' @param seed Randomness seed value
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' collect(sampleRDD(rdd, FALSE, 0.5, 1618L)) # ~5 distinct elements
+#' collect(sampleRDD(rdd, TRUE, 0.5, 9L)) # ~5 elements possibly with duplicates
+#'}
+#' @rdname sampleRDD
+#' @aliases sampleRDD,RDD
+#' @noRd
setMethod("sampleRDD",
signature(x = "RDD", withReplacement = "logical",
fraction = "numeric", seed = "integer"),
@@ -868,23 +894,24 @@ setMethod("sampleRDD",
lapplyPartitionsWithIndex(x, samplingFunc)
})
-# Return a list of the elements that are a sampled subset of the given RDD.
-#
-# @param x The RDD to sample elements from
-# @param withReplacement Sampling with replacement or not
-# @param num Number of elements to return
-# @param seed Randomness seed value
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, 1:100)
-# # exactly 5 elements sampled, which may not be distinct
-# takeSample(rdd, TRUE, 5L, 1618L)
-# # exactly 5 distinct elements sampled
-# takeSample(rdd, FALSE, 5L, 16181618L)
-#}
-# @rdname takeSample
-# @aliases takeSample,RDD
+#' Return a list of the elements that are a sampled subset of the given RDD.
+#'
+#' @param x The RDD to sample elements from
+#' @param withReplacement Sampling with replacement or not
+#' @param num Number of elements to return
+#' @param seed Randomness seed value
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:100)
+#' # exactly 5 elements sampled, which may not be distinct
+#' takeSample(rdd, TRUE, 5L, 1618L)
+#' # exactly 5 distinct elements sampled
+#' takeSample(rdd, FALSE, 5L, 16181618L)
+#'}
+#' @rdname takeSample
+#' @aliases takeSample,RDD
+#' @noRd
setMethod("takeSample", signature(x = "RDD", withReplacement = "logical",
num = "integer", seed = "integer"),
function(x, withReplacement, num, seed) {
@@ -931,18 +958,19 @@ setMethod("takeSample", signature(x = "RDD", withReplacement = "logical",
base::sample(samples)[1:total]
})
-# Creates tuples of the elements in this RDD by applying a function.
-#
-# @param x The RDD.
-# @param func The function to be applied.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, list(1, 2, 3))
-# collect(keyBy(rdd, function(x) { x*x })) # list(list(1, 1), list(4, 2), list(9, 3))
-#}
-# @rdname keyBy
-# @aliases keyBy,RDD
+#' Creates tuples of the elements in this RDD by applying a function.
+#'
+#' @param x The RDD.
+#' @param func The function to be applied.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(1, 2, 3))
+#' collect(keyBy(rdd, function(x) { x*x })) # list(list(1, 1), list(4, 2), list(9, 3))
+#'}
+#' @rdname keyBy
+#' @aliases keyBy,RDD
+#' @noRd
setMethod("keyBy",
signature(x = "RDD", func = "function"),
function(x, func) {
@@ -952,44 +980,46 @@ setMethod("keyBy",
lapply(x, apply.func)
})
-# Return a new RDD that has exactly numPartitions partitions.
-# Can increase or decrease the level of parallelism in this RDD. Internally,
-# this uses a shuffle to redistribute data.
-# If you are decreasing the number of partitions in this RDD, consider using
-# coalesce, which can avoid performing a shuffle.
-#
-# @param x The RDD.
-# @param numPartitions Number of partitions to create.
-# @seealso coalesce
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, list(1, 2, 3, 4, 5, 6, 7), 4L)
-# numPartitions(rdd) # 4
-# numPartitions(repartition(rdd, 2L)) # 2
-#}
-# @rdname repartition
-# @aliases repartition,RDD
+#' Return a new RDD that has exactly numPartitions partitions.
+#' Can increase or decrease the level of parallelism in this RDD. Internally,
+#' this uses a shuffle to redistribute data.
+#' If you are decreasing the number of partitions in this RDD, consider using
+#' coalesce, which can avoid performing a shuffle.
+#'
+#' @param x The RDD.
+#' @param numPartitions Number of partitions to create.
+#' @seealso coalesce
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(1, 2, 3, 4, 5, 6, 7), 4L)
+#' numPartitions(rdd) # 4
+#' numPartitions(repartition(rdd, 2L)) # 2
+#'}
+#' @rdname repartition
+#' @aliases repartition,RDD
+#' @noRd
setMethod("repartition",
signature(x = "RDD", numPartitions = "numeric"),
function(x, numPartitions) {
coalesce(x, numPartitions, TRUE)
})
-# Return a new RDD that is reduced into numPartitions partitions.
-#
-# @param x The RDD.
-# @param numPartitions Number of partitions to create.
-# @seealso repartition
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, list(1, 2, 3, 4, 5), 3L)
-# numPartitions(rdd) # 3
-# numPartitions(coalesce(rdd, 1L)) # 1
-#}
-# @rdname coalesce
-# @aliases coalesce,RDD
+#' Return a new RDD that is reduced into numPartitions partitions.
+#'
+#' @param x The RDD.
+#' @param numPartitions Number of partitions to create.
+#' @seealso repartition
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(1, 2, 3, 4, 5), 3L)
+#' numPartitions(rdd) # 3
+#' numPartitions(coalesce(rdd, 1L)) # 1
+#'}
+#' @rdname coalesce
+#' @aliases coalesce,RDD
+#' @noRd
setMethod("coalesce",
signature(x = "RDD", numPartitions = "numeric"),
function(x, numPartitions, shuffle = FALSE) {
@@ -1013,19 +1043,20 @@ setMethod("coalesce",
}
})
-# Save this RDD as a SequenceFile of serialized objects.
-#
-# @param x The RDD to save
-# @param path The directory where the file is saved
-# @seealso objectFile
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, 1:3)
-# saveAsObjectFile(rdd, "/tmp/sparkR-tmp")
-#}
-# @rdname saveAsObjectFile
-# @aliases saveAsObjectFile,RDD
+#' Save this RDD as a SequenceFile of serialized objects.
+#'
+#' @param x The RDD to save
+#' @param path The directory where the file is saved
+#' @seealso objectFile
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:3)
+#' saveAsObjectFile(rdd, "/tmp/sparkR-tmp")
+#'}
+#' @rdname saveAsObjectFile
+#' @aliases saveAsObjectFile,RDD
+#' @noRd
setMethod("saveAsObjectFile",
signature(x = "RDD", path = "character"),
function(x, path) {
@@ -1038,18 +1069,19 @@ setMethod("saveAsObjectFile",
invisible(callJMethod(getJRDD(x), "saveAsObjectFile", path))
})
-# Save this RDD as a text file, using string representations of elements.
-#
-# @param x The RDD to save
-# @param path The directory where the partitions of the text file are saved
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, 1:3)
-# saveAsTextFile(rdd, "/tmp/sparkR-tmp")
-#}
-# @rdname saveAsTextFile
-# @aliases saveAsTextFile,RDD
+#' Save this RDD as a text file, using string representations of elements.
+#'
+#' @param x The RDD to save
+#' @param path The directory where the partitions of the text file are saved
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:3)
+#' saveAsTextFile(rdd, "/tmp/sparkR-tmp")
+#'}
+#' @rdname saveAsTextFile
+#' @aliases saveAsTextFile,RDD
+#' @noRd
setMethod("saveAsTextFile",
signature(x = "RDD", path = "character"),
function(x, path) {
@@ -1062,21 +1094,22 @@ setMethod("saveAsTextFile",
callJMethod(getJRDD(stringRdd, serializedMode = "string"), "saveAsTextFile", path))
})
-# Sort an RDD by the given key function.
-#
-# @param x An RDD to be sorted.
-# @param func A function used to compute the sort key for each element.
-# @param ascending A flag to indicate whether the sorting is ascending or descending.
-# @param numPartitions Number of partitions to create.
-# @return An RDD where all elements are sorted.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, list(3, 2, 1))
-# collect(sortBy(rdd, function(x) { x })) # list (1, 2, 3)
-#}
-# @rdname sortBy
-# @aliases sortBy,RDD,RDD-method
+#' Sort an RDD by the given key function.
+#'
+#' @param x An RDD to be sorted.
+#' @param func A function used to compute the sort key for each element.
+#' @param ascending A flag to indicate whether the sorting is ascending or descending.
+#' @param numPartitions Number of partitions to create.
+#' @return An RDD where all elements are sorted.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(3, 2, 1))
+#' collect(sortBy(rdd, function(x) { x })) # list (1, 2, 3)
+#'}
+#' @rdname sortBy
+#' @aliases sortBy,RDD,RDD-method
+#' @noRd
setMethod("sortBy",
signature(x = "RDD", func = "function"),
function(x, func, ascending = TRUE, numPartitions = SparkR:::numPartitions(x)) {
@@ -1138,97 +1171,95 @@ takeOrderedElem <- function(x, num, ascending = TRUE) {
resList
}
-# Returns the first N elements from an RDD in ascending order.
-#
-# @param x An RDD.
-# @param num Number of elements to return.
-# @return The first N elements from the RDD in ascending order.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
-# takeOrdered(rdd, 6L) # list(1, 2, 3, 4, 5, 6)
-#}
-# @rdname takeOrdered
-# @aliases takeOrdered,RDD,RDD-method
+#' Returns the first N elements from an RDD in ascending order.
+#'
+#' @param x An RDD.
+#' @param num Number of elements to return.
+#' @return The first N elements from the RDD in ascending order.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
+#' takeOrdered(rdd, 6L) # list(1, 2, 3, 4, 5, 6)
+#'}
+#' @rdname takeOrdered
+#' @aliases takeOrdered,RDD,RDD-method
+#' @noRd
setMethod("takeOrdered",
signature(x = "RDD", num = "integer"),
function(x, num) {
takeOrderedElem(x, num)
})
-# Returns the top N elements from an RDD.
-#
-# @param x An RDD.
-# @param num Number of elements to return.
-# @return The top N elements from the RDD.
-# @rdname top
-# @export
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
-# top(rdd, 6L) # list(10, 9, 7, 6, 5, 4)
-#}
-# @rdname top
-# @aliases top,RDD,RDD-method
+#' Returns the top N elements from an RDD.
+#'
+#' @param x An RDD.
+#' @param num Number of elements to return.
+#' @return The top N elements from the RDD.
+#' @rdname top
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
+#' top(rdd, 6L) # list(10, 9, 7, 6, 5, 4)
+#'}
+#' @aliases top,RDD,RDD-method
+#' @noRd
setMethod("top",
signature(x = "RDD", num = "integer"),
function(x, num) {
takeOrderedElem(x, num, FALSE)
})
-# Fold an RDD using a given associative function and a neutral "zero value".
-#
-# Aggregate the elements of each partition, and then the results for all the
-# partitions, using a given associative function and a neutral "zero value".
-#
-# @param x An RDD.
-# @param zeroValue A neutral "zero value".
-# @param op An associative function for the folding operation.
-# @return The folding result.
-# @rdname fold
-# @seealso reduce
-# @export
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, list(1, 2, 3, 4, 5))
-# fold(rdd, 0, "+") # 15
-#}
-# @rdname fold
-# @aliases fold,RDD,RDD-method
+#' Fold an RDD using a given associative function and a neutral "zero value".
+#'
+#' Aggregate the elements of each partition, and then the results for all the
+#' partitions, using a given associative function and a neutral "zero value".
+#'
+#' @param x An RDD.
+#' @param zeroValue A neutral "zero value".
+#' @param op An associative function for the folding operation.
+#' @return The folding result.
+#' @rdname fold
+#' @seealso reduce
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(1, 2, 3, 4, 5))
+#' fold(rdd, 0, "+") # 15
+#'}
+#' @aliases fold,RDD,RDD-method
+#' @noRd
setMethod("fold",
signature(x = "RDD", zeroValue = "ANY", op = "ANY"),
function(x, zeroValue, op) {
aggregateRDD(x, zeroValue, op, op)
})
-# Aggregate an RDD using the given combine functions and a neutral "zero value".
-#
-# Aggregate the elements of each partition, and then the results for all the
-# partitions, using given combine functions and a neutral "zero value".
-#
-# @param x An RDD.
-# @param zeroValue A neutral "zero value".
-# @param seqOp A function to aggregate the RDD elements. It may return a different
-# result type from the type of the RDD elements.
-# @param combOp A function to aggregate results of seqOp.
-# @return The aggregation result.
-# @rdname aggregateRDD
-# @seealso reduce
-# @export
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, list(1, 2, 3, 4))
-# zeroValue <- list(0, 0)
-# seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
-# combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
-# aggregateRDD(rdd, zeroValue, seqOp, combOp) # list(10, 4)
-#}
-# @rdname aggregateRDD
-# @aliases aggregateRDD,RDD,RDD-method
+#' Aggregate an RDD using the given combine functions and a neutral "zero value".
+#'
+#' Aggregate the elements of each partition, and then the results for all the
+#' partitions, using given combine functions and a neutral "zero value".
+#'
+#' @param x An RDD.
+#' @param zeroValue A neutral "zero value".
+#' @param seqOp A function to aggregate the RDD elements. It may return a different
+#' result type from the type of the RDD elements.
+#' @param combOp A function to aggregate results of seqOp.
+#' @return The aggregation result.
+#' @rdname aggregateRDD
+#' @seealso reduce
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(1, 2, 3, 4))
+#' zeroValue <- list(0, 0)
+#' seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
+#' combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
+#' aggregateRDD(rdd, zeroValue, seqOp, combOp) # list(10, 4)
+#'}
+#' @aliases aggregateRDD,RDD,RDD-method
+#' @noRd
setMethod("aggregateRDD",
signature(x = "RDD", zeroValue = "ANY", seqOp = "ANY", combOp = "ANY"),
function(x, zeroValue, seqOp, combOp) {
@@ -1241,25 +1272,24 @@ setMethod("aggregateRDD",
Reduce(combOp, partitionList, zeroValue)
})
-# Pipes elements to a forked external process.
-#
-# The same as 'pipe()' in Spark.
-#
-# @param x The RDD whose elements are piped to the forked external process.
-# @param command The command to fork an external process.
-# @param env A named list to set environment variables of the external process.
-# @return A new RDD created by piping all elements to a forked external process.
-# @rdname pipeRDD
-# @export
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, 1:10)
-# collect(pipeRDD(rdd, "more")
-# Output: c("1", "2", ..., "10")
-#}
-# @rdname pipeRDD
-# @aliases pipeRDD,RDD,character-method
+#' Pipes elements to a forked external process.
+#'
+#' The same as 'pipe()' in Spark.
+#'
+#' @param x The RDD whose elements are piped to the forked external process.
+#' @param command The command to fork an external process.
+#' @param env A named list to set environment variables of the external process.
+#' @return A new RDD created by piping all elements to a forked external process.
+#' @rdname pipeRDD
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' collect(pipeRDD(rdd, "more")
+#' Output: c("1", "2", ..., "10")
+#'}
+#' @aliases pipeRDD,RDD,character-method
+#' @noRd
setMethod("pipeRDD",
signature(x = "RDD", command = "character"),
function(x, command, env = list()) {
@@ -1274,42 +1304,40 @@ setMethod("pipeRDD",
lapplyPartition(x, func)
})
-# TODO: Consider caching the name in the RDD's environment
-# Return an RDD's name.
-#
-# @param x The RDD whose name is returned.
-# @rdname name
-# @export
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, list(1,2,3))
-# name(rdd) # NULL (if not set before)
-#}
-# @rdname name
-# @aliases name,RDD
+#' TODO: Consider caching the name in the RDD's environment
+#' Return an RDD's name.
+#'
+#' @param x The RDD whose name is returned.
+#' @rdname name
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(1,2,3))
+#' name(rdd) # NULL (if not set before)
+#'}
+#' @aliases name,RDD
+#' @noRd
setMethod("name",
signature(x = "RDD"),
function(x) {
callJMethod(getJRDD(x), "name")
})
-# Set an RDD's name.
-#
-# @param x The RDD whose name is to be set.
-# @param name The RDD name to be set.
-# @return a new RDD renamed.
-# @rdname setName
-# @export
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, list(1,2,3))
-# setName(rdd, "myRDD")
-# name(rdd) # "myRDD"
-#}
-# @rdname setName
-# @aliases setName,RDD
+#' Set an RDD's name.
+#'
+#' @param x The RDD whose name is to be set.
+#' @param name The RDD name to be set.
+#' @return a new RDD renamed.
+#' @rdname setName
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(1,2,3))
+#' setName(rdd, "myRDD")
+#' name(rdd) # "myRDD"
+#'}
+#' @aliases setName,RDD
+#' @noRd
setMethod("setName",
signature(x = "RDD", name = "character"),
function(x, name) {
@@ -1317,25 +1345,26 @@ setMethod("setName",
x
})
-# Zip an RDD with generated unique Long IDs.
-#
-# Items in the kth partition will get ids k, n+k, 2*n+k, ..., where
-# n is the number of partitions. So there may exist gaps, but this
-# method won't trigger a spark job, which is different from
-# zipWithIndex.
-#
-# @param x An RDD to be zipped.
-# @return An RDD with zipped items.
-# @seealso zipWithIndex
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
-# collect(zipWithUniqueId(rdd))
-# # list(list("a", 0), list("b", 3), list("c", 1), list("d", 4), list("e", 2))
-#}
-# @rdname zipWithUniqueId
-# @aliases zipWithUniqueId,RDD
+#' Zip an RDD with generated unique Long IDs.
+#'
+#' Items in the kth partition will get ids k, n+k, 2*n+k, ..., where
+#' n is the number of partitions. So there may exist gaps, but this
+#' method won't trigger a spark job, which is different from
+#' zipWithIndex.
+#'
+#' @param x An RDD to be zipped.
+#' @return An RDD with zipped items.
+#' @seealso zipWithIndex
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
+#' collect(zipWithUniqueId(rdd))
+#' # list(list("a", 0), list("b", 3), list("c", 1), list("d", 4), list("e", 2))
+#'}
+#' @rdname zipWithUniqueId
+#' @aliases zipWithUniqueId,RDD
+#' @noRd
setMethod("zipWithUniqueId",
signature(x = "RDD"),
function(x) {
@@ -1354,28 +1383,29 @@ setMethod("zipWithUniqueId",
lapplyPartitionsWithIndex(x, partitionFunc)
})
-# Zip an RDD with its element indices.
-#
-# The ordering is first based on the partition index and then the
-# ordering of items within each partition. So the first item in
-# the first partition gets index 0, and the last item in the last
-# partition receives the largest index.
-#
-# This method needs to trigger a Spark job when this RDD contains
-# more than one partition.
-#
-# @param x An RDD to be zipped.
-# @return An RDD with zipped items.
-# @seealso zipWithUniqueId
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
-# collect(zipWithIndex(rdd))
-# # list(list("a", 0), list("b", 1), list("c", 2), list("d", 3), list("e", 4))
-#}
-# @rdname zipWithIndex
-# @aliases zipWithIndex,RDD
+#' Zip an RDD with its element indices.
+#'
+#' The ordering is first based on the partition index and then the
+#' ordering of items within each partition. So the first item in
+#' the first partition gets index 0, and the last item in the last
+#' partition receives the largest index.
+#'
+#' This method needs to trigger a Spark job when this RDD contains
+#' more than one partition.
+#'
+#' @param x An RDD to be zipped.
+#' @return An RDD with zipped items.
+#' @seealso zipWithUniqueId
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
+#' collect(zipWithIndex(rdd))
+#' # list(list("a", 0), list("b", 1), list("c", 2), list("d", 3), list("e", 4))
+#'}
+#' @rdname zipWithIndex
+#' @aliases zipWithIndex,RDD
+#' @noRd
setMethod("zipWithIndex",
signature(x = "RDD"),
function(x) {
@@ -1407,20 +1437,21 @@ setMethod("zipWithIndex",
lapplyPartitionsWithIndex(x, partitionFunc)
})
-# Coalesce all elements within each partition of an RDD into a list.
-#
-# @param x An RDD.
-# @return An RDD created by coalescing all elements within
-# each partition into a list.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, as.list(1:4), 2L)
-# collect(glom(rdd))
-# # list(list(1, 2), list(3, 4))
-#}
-# @rdname glom
-# @aliases glom,RDD
+#' Coalesce all elements within each partition of an RDD into a list.
+#'
+#' @param x An RDD.
+#' @return An RDD created by coalescing all elements within
+#' each partition into a list.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, as.list(1:4), 2L)
+#' collect(glom(rdd))
+#' # list(list(1, 2), list(3, 4))
+#'}
+#' @rdname glom
+#' @aliases glom,RDD
+#' @noRd
setMethod("glom",
signature(x = "RDD"),
function(x) {
@@ -1433,21 +1464,22 @@ setMethod("glom",
############ Binary Functions #############
-# Return the union RDD of two RDDs.
-# The same as union() in Spark.
-#
-# @param x An RDD.
-# @param y An RDD.
-# @return a new RDD created by performing the simple union (witout removing
-# duplicates) of two input RDDs.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, 1:3)
-# unionRDD(rdd, rdd) # 1, 2, 3, 1, 2, 3
-#}
-# @rdname unionRDD
-# @aliases unionRDD,RDD,RDD-method
+#' Return the union RDD of two RDDs.
+#' The same as union() in Spark.
+#'
+#' @param x An RDD.
+#' @param y An RDD.
+#' @return a new RDD created by performing the simple union (witout removing
+#' duplicates) of two input RDDs.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:3)
+#' unionRDD(rdd, rdd) # 1, 2, 3, 1, 2, 3
+#'}
+#' @rdname unionRDD
+#' @aliases unionRDD,RDD,RDD-method
+#' @noRd
setMethod("unionRDD",
signature(x = "RDD", y = "RDD"),
function(x, y) {
@@ -1464,27 +1496,28 @@ setMethod("unionRDD",
union.rdd
})
-# Zip an RDD with another RDD.
-#
-# Zips this RDD with another one, returning key-value pairs with the
-# first element in each RDD second element in each RDD, etc. Assumes
-# that the two RDDs have the same number of partitions and the same
-# number of elements in each partition (e.g. one was made through
-# a map on the other).
-#
-# @param x An RDD to be zipped.
-# @param other Another RDD to be zipped.
-# @return An RDD zipped from the two RDDs.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd1 <- parallelize(sc, 0:4)
-# rdd2 <- parallelize(sc, 1000:1004)
-# collect(zipRDD(rdd1, rdd2))
-# # list(list(0, 1000), list(1, 1001), list(2, 1002), list(3, 1003), list(4, 1004))
-#}
-# @rdname zipRDD
-# @aliases zipRDD,RDD
+#' Zip an RDD with another RDD.
+#'
+#' Zips this RDD with another one, returning key-value pairs with the
+#' first element in each RDD second element in each RDD, etc. Assumes
+#' that the two RDDs have the same number of partitions and the same
+#' number of elements in each partition (e.g. one was made through
+#' a map on the other).
+#'
+#' @param x An RDD to be zipped.
+#' @param other Another RDD to be zipped.
+#' @return An RDD zipped from the two RDDs.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd1 <- parallelize(sc, 0:4)
+#' rdd2 <- parallelize(sc, 1000:1004)
+#' collect(zipRDD(rdd1, rdd2))
+#' # list(list(0, 1000), list(1, 1001), list(2, 1002), list(3, 1003), list(4, 1004))
+#'}
+#' @rdname zipRDD
+#' @aliases zipRDD,RDD
+#' @noRd
setMethod("zipRDD",
signature(x = "RDD", other = "RDD"),
function(x, other) {
@@ -1503,24 +1536,25 @@ setMethod("zipRDD",
mergePartitions(rdd, TRUE)
})
-# Cartesian product of this RDD and another one.
-#
-# Return the Cartesian product of this RDD and another one,
-# that is, the RDD of all pairs of elements (a, b) where a
-# is in this and b is in other.
-#
-# @param x An RDD.
-# @param other An RDD.
-# @return A new RDD which is the Cartesian product of these two RDDs.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, 1:2)
-# sortByKey(cartesian(rdd, rdd))
-# # list(list(1, 1), list(1, 2), list(2, 1), list(2, 2))
-#}
-# @rdname cartesian
-# @aliases cartesian,RDD,RDD-method
+#' Cartesian product of this RDD and another one.
+#'
+#' Return the Cartesian product of this RDD and another one,
+#' that is, the RDD of all pairs of elements (a, b) where a
+#' is in this and b is in other.
+#'
+#' @param x An RDD.
+#' @param other An RDD.
+#' @return A new RDD which is the Cartesian product of these two RDDs.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:2)
+#' sortByKey(cartesian(rdd, rdd))
+#' # list(list(1, 1), list(1, 2), list(2, 1), list(2, 2))
+#'}
+#' @rdname cartesian
+#' @aliases cartesian,RDD,RDD-method
+#' @noRd
setMethod("cartesian",
signature(x = "RDD", other = "RDD"),
function(x, other) {
@@ -1533,24 +1567,25 @@ setMethod("cartesian",
mergePartitions(rdd, FALSE)
})
-# Subtract an RDD with another RDD.
-#
-# Return an RDD with the elements from this that are not in other.
-#
-# @param x An RDD.
-# @param other An RDD.
-# @param numPartitions Number of the partitions in the result RDD.
-# @return An RDD with the elements from this that are not in other.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd1 <- parallelize(sc, list(1, 1, 2, 2, 3, 4))
-# rdd2 <- parallelize(sc, list(2, 4))
-# collect(subtract(rdd1, rdd2))
-# # list(1, 1, 3)
-#}
-# @rdname subtract
-# @aliases subtract,RDD
+#' Subtract an RDD with another RDD.
+#'
+#' Return an RDD with the elements from this that are not in other.
+#'
+#' @param x An RDD.
+#' @param other An RDD.
+#' @param numPartitions Number of the partitions in the result RDD.
+#' @return An RDD with the elements from this that are not in other.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd1 <- parallelize(sc, list(1, 1, 2, 2, 3, 4))
+#' rdd2 <- parallelize(sc, list(2, 4))
+#' collect(subtract(rdd1, rdd2))
+#' # list(1, 1, 3)
+#'}
+#' @rdname subtract
+#' @aliases subtract,RDD
+#' @noRd
setMethod("subtract",
signature(x = "RDD", other = "RDD"),
function(x, other, numPartitions = SparkR:::numPartitions(x)) {
@@ -1560,28 +1595,29 @@ setMethod("subtract",
keys(subtractByKey(rdd1, rdd2, numPartitions))
})
-# Intersection of this RDD and another one.
-#
-# Return the intersection of this RDD and another one.
-# The output will not contain any duplicate elements,
-# even if the input RDDs did. Performs a hash partition
-# across the cluster.
-# Note that this method performs a shuffle internally.
-#
-# @param x An RDD.
-# @param other An RDD.
-# @param numPartitions The number of partitions in the result RDD.
-# @return An RDD which is the intersection of these two RDDs.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5))
-# rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8))
-# collect(sortBy(intersection(rdd1, rdd2), function(x) { x }))
-# # list(1, 2, 3)
-#}
-# @rdname intersection
-# @aliases intersection,RDD
+#' Intersection of this RDD and another one.
+#'
+#' Return the intersection of this RDD and another one.
+#' The output will not contain any duplicate elements,
+#' even if the input RDDs did. Performs a hash partition
+#' across the cluster.
+#' Note that this method performs a shuffle internally.
+#'
+#' @param x An RDD.
+#' @param other An RDD.
+#' @param numPartitions The number of partitions in the result RDD.
+#' @return An RDD which is the intersection of these two RDDs.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5))
+#' rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8))
+#' collect(sortBy(intersection(rdd1, rdd2), function(x) { x }))
+#' # list(1, 2, 3)
+#'}
+#' @rdname intersection
+#' @aliases intersection,RDD
+#' @noRd
setMethod("intersection",
signature(x = "RDD", other = "RDD"),
function(x, other, numPartitions = SparkR:::numPartitions(x)) {
@@ -1597,26 +1633,27 @@ setMethod("intersection",
keys(filterRDD(cogroup(rdd1, rdd2, numPartitions = numPartitions), filterFunction))
})
-# Zips an RDD's partitions with one (or more) RDD(s).
-# Same as zipPartitions in Spark.
-#
-# @param ... RDDs to be zipped.
-# @param func A function to transform zipped partitions.
-# @return A new RDD by applying a function to the zipped partitions.
-# Assumes that all the RDDs have the *same number of partitions*, but
-# does *not* require them to have the same number of elements in each partition.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
-# rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
-# rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
-# collect(zipPartitions(rdd1, rdd2, rdd3,
-# func = function(x, y, z) { list(list(x, y, z))} ))
-# # list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6)))
-#}
-# @rdname zipRDD
-# @aliases zipPartitions,RDD
+#' Zips an RDD's partitions with one (or more) RDD(s).
+#' Same as zipPartitions in Spark.
+#'
+#' @param ... RDDs to be zipped.
+#' @param func A function to transform zipped partitions.
+#' @return A new RDD by applying a function to the zipped partitions.
+#' Assumes that all the RDDs have the *same number of partitions*, but
+#' does *not* require them to have the same number of elements in each partition.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
+#' rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
+#' rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
+#' collect(zipPartitions(rdd1, rdd2, rdd3,
+#' func = function(x, y, z) { list(list(x, y, z))} ))
+#' # list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6)))
+#'}
+#' @rdname zipRDD
+#' @aliases zipPartitions,RDD
+#' @noRd
setMethod("zipPartitions",
"RDD",
function(..., func) {
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index 1bf025cce4..fd013fdb30 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -144,7 +144,6 @@ createDataFrame <- function(sqlContext, data, schema = NULL, samplingRatio = 1.0
}
stopifnot(class(schema) == "structType")
- # schemaString <- tojson(schema)
jrdd <- getJRDD(lapply(rdd, function(x) x), "row")
srdd <- callJMethod(jrdd, "rdd")
@@ -160,22 +159,21 @@ as.DataFrame <- function(sqlContext, data, schema = NULL, samplingRatio = 1.0) {
createDataFrame(sqlContext, data, schema, samplingRatio)
}
-# toDF
-#
-# Converts an RDD to a DataFrame by infer the types.
-#
-# @param x An RDD
-#
-# @rdname DataFrame
-# @export
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# sqlContext <- sparkRSQL.init(sc)
-# rdd <- lapply(parallelize(sc, 1:10), function(x) list(a=x, b=as.character(x)))
-# df <- toDF(rdd)
-# }
-
+#' toDF
+#'
+#' Converts an RDD to a DataFrame by infer the types.
+#'
+#' @param x An RDD
+#'
+#' @rdname DataFrame
+#' @noRd
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlContext <- sparkRSQL.init(sc)
+#' rdd <- lapply(parallelize(sc, 1:10), function(x) list(a=x, b=as.character(x)))
+#' df <- toDF(rdd)
+#'}
setGeneric("toDF", function(x, ...) { standardGeneric("toDF") })
setMethod("toDF", signature(x = "RDD"),
@@ -217,23 +215,23 @@ jsonFile <- function(sqlContext, path) {
}
-# JSON RDD
-#
-# Loads an RDD storing one JSON object per string as a DataFrame.
-#
-# @param sqlContext SQLContext to use
-# @param rdd An RDD of JSON string
-# @param schema A StructType object to use as schema
-# @param samplingRatio The ratio of simpling used to infer the schema
-# @return A DataFrame
-# @export
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# sqlContext <- sparkRSQL.init(sc)
-# rdd <- texFile(sc, "path/to/json")
-# df <- jsonRDD(sqlContext, rdd)
-# }
+#' JSON RDD
+#'
+#' Loads an RDD storing one JSON object per string as a DataFrame.
+#'
+#' @param sqlContext SQLContext to use
+#' @param rdd An RDD of JSON string
+#' @param schema A StructType object to use as schema
+#' @param samplingRatio The ratio of simpling used to infer the schema
+#' @return A DataFrame
+#' @noRd
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlContext <- sparkRSQL.init(sc)
+#' rdd <- texFile(sc, "path/to/json")
+#' df <- jsonRDD(sqlContext, rdd)
+#'}
# TODO: support schema
jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) {
diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R
index 720990e1c6..471bec1eac 100644
--- a/R/pkg/R/context.R
+++ b/R/pkg/R/context.R
@@ -25,23 +25,23 @@ getMinPartitions <- function(sc, minPartitions) {
as.integer(minPartitions)
}
-# Create an RDD from a text file.
-#
-# This function reads a text file from HDFS, a local file system (available on all
-# nodes), or any Hadoop-supported file system URI, and creates an
-# RDD of strings from it.
-#
-# @param sc SparkContext to use
-# @param path Path of file to read. A vector of multiple paths is allowed.
-# @param minPartitions Minimum number of partitions to be created. If NULL, the default
-# value is chosen based on available parallelism.
-# @return RDD where each item is of type \code{character}
-# @export
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# lines <- textFile(sc, "myfile.txt")
-#}
+#' Create an RDD from a text file.
+#'
+#' This function reads a text file from HDFS, a local file system (available on all
+#' nodes), or any Hadoop-supported file system URI, and creates an
+#' RDD of strings from it.
+#'
+#' @param sc SparkContext to use
+#' @param path Path of file to read. A vector of multiple paths is allowed.
+#' @param minPartitions Minimum number of partitions to be created. If NULL, the default
+#' value is chosen based on available parallelism.
+#' @return RDD where each item is of type \code{character}
+#' @noRd
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' lines <- textFile(sc, "myfile.txt")
+#'}
textFile <- function(sc, path, minPartitions = NULL) {
# Allow the user to have a more flexible definiton of the text file path
path <- suppressWarnings(normalizePath(path))
@@ -53,23 +53,23 @@ textFile <- function(sc, path, minPartitions = NULL) {
RDD(jrdd, "string")
}
-# Load an RDD saved as a SequenceFile containing serialized objects.
-#
-# The file to be loaded should be one that was previously generated by calling
-# saveAsObjectFile() of the RDD class.
-#
-# @param sc SparkContext to use
-# @param path Path of file to read. A vector of multiple paths is allowed.
-# @param minPartitions Minimum number of partitions to be created. If NULL, the default
-# value is chosen based on available parallelism.
-# @return RDD containing serialized R objects.
-# @seealso saveAsObjectFile
-# @export
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- objectFile(sc, "myfile")
-#}
+#' Load an RDD saved as a SequenceFile containing serialized objects.
+#'
+#' The file to be loaded should be one that was previously generated by calling
+#' saveAsObjectFile() of the RDD class.
+#'
+#' @param sc SparkContext to use
+#' @param path Path of file to read. A vector of multiple paths is allowed.
+#' @param minPartitions Minimum number of partitions to be created. If NULL, the default
+#' value is chosen based on available parallelism.
+#' @return RDD containing serialized R objects.
+#' @seealso saveAsObjectFile
+#' @noRd
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- objectFile(sc, "myfile")
+#'}
objectFile <- function(sc, path, minPartitions = NULL) {
# Allow the user to have a more flexible definiton of the text file path
path <- suppressWarnings(normalizePath(path))
@@ -81,24 +81,24 @@ objectFile <- function(sc, path, minPartitions = NULL) {
RDD(jrdd, "byte")
}
-# Create an RDD from a homogeneous list or vector.
-#
-# This function creates an RDD from a local homogeneous list in R. The elements
-# in the list are split into \code{numSlices} slices and distributed to nodes
-# in the cluster.
-#
-# @param sc SparkContext to use
-# @param coll collection to parallelize
-# @param numSlices number of partitions to create in the RDD
-# @return an RDD created from this collection
-# @export
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, 1:10, 2)
-# # The RDD should contain 10 elements
-# length(rdd)
-#}
+#' Create an RDD from a homogeneous list or vector.
+#'
+#' This function creates an RDD from a local homogeneous list in R. The elements
+#' in the list are split into \code{numSlices} slices and distributed to nodes
+#' in the cluster.
+#'
+#' @param sc SparkContext to use
+#' @param coll collection to parallelize
+#' @param numSlices number of partitions to create in the RDD
+#' @return an RDD created from this collection
+#' @noRd
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10, 2)
+#' # The RDD should contain 10 elements
+#' length(rdd)
+#'}
parallelize <- function(sc, coll, numSlices = 1) {
# TODO: bound/safeguard numSlices
# TODO: unit tests for if the split works for all primitives
@@ -133,33 +133,32 @@ parallelize <- function(sc, coll, numSlices = 1) {
RDD(jrdd, "byte")
}
-# Include this specified package on all workers
-#
-# This function can be used to include a package on all workers before the
-# user's code is executed. This is useful in scenarios where other R package
-# functions are used in a function passed to functions like \code{lapply}.
-# NOTE: The package is assumed to be installed on every node in the Spark
-# cluster.
-#
-# @param sc SparkContext to use
-# @param pkg Package name
-#
-# @export
-# @examples
-#\dontrun{
-# library(Matrix)
-#
-# sc <- sparkR.init()
-# # Include the matrix library we will be using
-# includePackage(sc, Matrix)
-#
-# generateSparse <- function(x) {
-# sparseMatrix(i=c(1, 2, 3), j=c(1, 2, 3), x=c(1, 2, 3))
-# }
-#
-# rdd <- lapplyPartition(parallelize(sc, 1:2, 2L), generateSparse)
-# collect(rdd)
-#}
+#' Include this specified package on all workers
+#'
+#' This function can be used to include a package on all workers before the
+#' user's code is executed. This is useful in scenarios where other R package
+#' functions are used in a function passed to functions like \code{lapply}.
+#' NOTE: The package is assumed to be installed on every node in the Spark
+#' cluster.
+#'
+#' @param sc SparkContext to use
+#' @param pkg Package name
+#' @noRd
+#' @examples
+#'\dontrun{
+#' library(Matrix)
+#'
+#' sc <- sparkR.init()
+#' # Include the matrix library we will be using
+#' includePackage(sc, Matrix)
+#'
+#' generateSparse <- function(x) {
+#' sparseMatrix(i=c(1, 2, 3), j=c(1, 2, 3), x=c(1, 2, 3))
+#' }
+#'
+#' rdd <- lapplyPartition(parallelize(sc, 1:2, 2L), generateSparse)
+#' collect(rdd)
+#'}
includePackage <- function(sc, pkg) {
pkg <- as.character(substitute(pkg))
if (exists(".packages", .sparkREnv)) {
@@ -171,30 +170,30 @@ includePackage <- function(sc, pkg) {
.sparkREnv$.packages <- packages
}
-# @title Broadcast a variable to all workers
-#
-# @description
-# Broadcast a read-only variable to the cluster, returning a \code{Broadcast}
-# object for reading it in distributed functions.
-#
-# @param sc Spark Context to use
-# @param object Object to be broadcast
-# @export
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, 1:2, 2L)
-#
-# # Large Matrix object that we want to broadcast
-# randomMat <- matrix(nrow=100, ncol=10, data=rnorm(1000))
-# randomMatBr <- broadcast(sc, randomMat)
-#
-# # Use the broadcast variable inside the function
-# useBroadcast <- function(x) {
-# sum(value(randomMatBr) * x)
-# }
-# sumRDD <- lapply(rdd, useBroadcast)
-#}
+#' @title Broadcast a variable to all workers
+#'
+#' @description
+#' Broadcast a read-only variable to the cluster, returning a \code{Broadcast}
+#' object for reading it in distributed functions.
+#'
+#' @param sc Spark Context to use
+#' @param object Object to be broadcast
+#' @noRd
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:2, 2L)
+#'
+#' # Large Matrix object that we want to broadcast
+#' randomMat <- matrix(nrow=100, ncol=10, data=rnorm(1000))
+#' randomMatBr <- broadcast(sc, randomMat)
+#'
+#' # Use the broadcast variable inside the function
+#' useBroadcast <- function(x) {
+#' sum(value(randomMatBr) * x)
+#' }
+#' sumRDD <- lapply(rdd, useBroadcast)
+#'}
broadcast <- function(sc, object) {
objName <- as.character(substitute(object))
serializedObj <- serialize(object, connection = NULL)
@@ -205,21 +204,21 @@ broadcast <- function(sc, object) {
Broadcast(id, object, jBroadcast, objName)
}
-# @title Set the checkpoint directory
-#
-# Set the directory under which RDDs are going to be checkpointed. The
-# directory must be a HDFS path if running on a cluster.
-#
-# @param sc Spark Context to use
-# @param dirName Directory path
-# @export
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# setCheckpointDir(sc, "~/checkpoint")
-# rdd <- parallelize(sc, 1:2, 2L)
-# checkpoint(rdd)
-#}
+#' @title Set the checkpoint directory
+#'
+#' Set the directory under which RDDs are going to be checkpointed. The
+#' directory must be a HDFS path if running on a cluster.
+#'
+#' @param sc Spark Context to use
+#' @param dirName Directory path
+#' @noRd
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' setCheckpointDir(sc, "~/checkpoint")
+#' rdd <- parallelize(sc, 1:2, 2L)
+#' checkpoint(rdd)
+#'}
setCheckpointDir <- function(sc, dirName) {
invisible(callJMethod(sc, "setCheckpointDir", suppressWarnings(normalizePath(dirName))))
}
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 92ad4ee868..612e639f8a 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -88,12 +88,8 @@ setGeneric("flatMap", function(X, FUN) { standardGeneric("flatMap") })
# @export
setGeneric("fold", function(x, zeroValue, op) { standardGeneric("fold") })
-# @rdname foreach
-# @export
setGeneric("foreach", function(x, func) { standardGeneric("foreach") })
-# @rdname foreach
-# @export
setGeneric("foreachPartition", function(x, func) { standardGeneric("foreachPartition") })
# The jrdd accessor function.
@@ -107,27 +103,17 @@ setGeneric("glom", function(x) { standardGeneric("glom") })
# @export
setGeneric("keyBy", function(x, func) { standardGeneric("keyBy") })
-# @rdname lapplyPartition
-# @export
setGeneric("lapplyPartition", function(X, FUN) { standardGeneric("lapplyPartition") })
-# @rdname lapplyPartitionsWithIndex
-# @export
setGeneric("lapplyPartitionsWithIndex",
function(X, FUN) {
standardGeneric("lapplyPartitionsWithIndex")
})
-# @rdname lapply
-# @export
setGeneric("map", function(X, FUN) { standardGeneric("map") })
-# @rdname lapplyPartition
-# @export
setGeneric("mapPartitions", function(X, FUN) { standardGeneric("mapPartitions") })
-# @rdname lapplyPartitionsWithIndex
-# @export
setGeneric("mapPartitionsWithIndex",
function(X, FUN) { standardGeneric("mapPartitionsWithIndex") })
@@ -563,12 +549,8 @@ setGeneric("summarize", function(x,...) { standardGeneric("summarize") })
#' @export
setGeneric("summary", function(object, ...) { standardGeneric("summary") })
-# @rdname tojson
-# @export
setGeneric("toJSON", function(x) { standardGeneric("toJSON") })
-#' @rdname DataFrame
-#' @export
setGeneric("toRDD", function(x) { standardGeneric("toRDD") })
#' @rdname unionAll
diff --git a/R/pkg/R/pairRDD.R b/R/pkg/R/pairRDD.R
index 199c3fd6ab..991bea4d20 100644
--- a/R/pkg/R/pairRDD.R
+++ b/R/pkg/R/pairRDD.R
@@ -21,23 +21,24 @@ NULL
############ Actions and Transformations ############
-# Look up elements of a key in an RDD
-#
-# @description
-# \code{lookup} returns a list of values in this RDD for key key.
-#
-# @param x The RDD to collect
-# @param key The key to look up for
-# @return a list of values in this RDD for key key
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# pairs <- list(c(1, 1), c(2, 2), c(1, 3))
-# rdd <- parallelize(sc, pairs)
-# lookup(rdd, 1) # list(1, 3)
-#}
-# @rdname lookup
-# @aliases lookup,RDD-method
+#' Look up elements of a key in an RDD
+#'
+#' @description
+#' \code{lookup} returns a list of values in this RDD for key key.
+#'
+#' @param x The RDD to collect
+#' @param key The key to look up for
+#' @return a list of values in this RDD for key key
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' pairs <- list(c(1, 1), c(2, 2), c(1, 3))
+#' rdd <- parallelize(sc, pairs)
+#' lookup(rdd, 1) # list(1, 3)
+#'}
+#' @rdname lookup
+#' @aliases lookup,RDD-method
+#' @noRd
setMethod("lookup",
signature(x = "RDD", key = "ANY"),
function(x, key) {
@@ -49,21 +50,22 @@ setMethod("lookup",
collect(valsRDD)
})
-# Count the number of elements for each key, and return the result to the
-# master as lists of (key, count) pairs.
-#
-# Same as countByKey in Spark.
-#
-# @param x The RDD to count keys.
-# @return list of (key, count) pairs, where count is number of each key in rdd.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, list(c("a", 1), c("b", 1), c("a", 1)))
-# countByKey(rdd) # ("a", 2L), ("b", 1L)
-#}
-# @rdname countByKey
-# @aliases countByKey,RDD-method
+#' Count the number of elements for each key, and return the result to the
+#' master as lists of (key, count) pairs.
+#'
+#' Same as countByKey in Spark.
+#'
+#' @param x The RDD to count keys.
+#' @return list of (key, count) pairs, where count is number of each key in rdd.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(c("a", 1), c("b", 1), c("a", 1)))
+#' countByKey(rdd) # ("a", 2L), ("b", 1L)
+#'}
+#' @rdname countByKey
+#' @aliases countByKey,RDD-method
+#' @noRd
setMethod("countByKey",
signature(x = "RDD"),
function(x) {
@@ -71,17 +73,18 @@ setMethod("countByKey",
countByValue(keys)
})
-# Return an RDD with the keys of each tuple.
-#
-# @param x The RDD from which the keys of each tuple is returned.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, list(list(1, 2), list(3, 4)))
-# collect(keys(rdd)) # list(1, 3)
-#}
-# @rdname keys
-# @aliases keys,RDD
+#' Return an RDD with the keys of each tuple.
+#'
+#' @param x The RDD from which the keys of each tuple is returned.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(list(1, 2), list(3, 4)))
+#' collect(keys(rdd)) # list(1, 3)
+#'}
+#' @rdname keys
+#' @aliases keys,RDD
+#' @noRd
setMethod("keys",
signature(x = "RDD"),
function(x) {
@@ -91,17 +94,18 @@ setMethod("keys",
lapply(x, func)
})
-# Return an RDD with the values of each tuple.
-#
-# @param x The RDD from which the values of each tuple is returned.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, list(list(1, 2), list(3, 4)))
-# collect(values(rdd)) # list(2, 4)
-#}
-# @rdname values
-# @aliases values,RDD
+#' Return an RDD with the values of each tuple.
+#'
+#' @param x The RDD from which the values of each tuple is returned.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(list(1, 2), list(3, 4)))
+#' collect(values(rdd)) # list(2, 4)
+#'}
+#' @rdname values
+#' @aliases values,RDD
+#' @noRd
setMethod("values",
signature(x = "RDD"),
function(x) {
@@ -111,23 +115,24 @@ setMethod("values",
lapply(x, func)
})
-# Applies a function to all values of the elements, without modifying the keys.
-#
-# The same as `mapValues()' in Spark.
-#
-# @param X The RDD to apply the transformation.
-# @param FUN the transformation to apply on the value of each element.
-# @return a new RDD created by the transformation.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, 1:10)
-# makePairs <- lapply(rdd, function(x) { list(x, x) })
-# collect(mapValues(makePairs, function(x) { x * 2) })
-# Output: list(list(1,2), list(2,4), list(3,6), ...)
-#}
-# @rdname mapValues
-# @aliases mapValues,RDD,function-method
+#' Applies a function to all values of the elements, without modifying the keys.
+#'
+#' The same as `mapValues()' in Spark.
+#'
+#' @param X The RDD to apply the transformation.
+#' @param FUN the transformation to apply on the value of each element.
+#' @return a new RDD created by the transformation.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' makePairs <- lapply(rdd, function(x) { list(x, x) })
+#' collect(mapValues(makePairs, function(x) { x * 2) })
+#' Output: list(list(1,2), list(2,4), list(3,6), ...)
+#'}
+#' @rdname mapValues
+#' @aliases mapValues,RDD,function-method
+#' @noRd
setMethod("mapValues",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
@@ -137,23 +142,24 @@ setMethod("mapValues",
lapply(X, func)
})
-# Pass each value in the key-value pair RDD through a flatMap function without
-# changing the keys; this also retains the original RDD's partitioning.
-#
-# The same as 'flatMapValues()' in Spark.
-#
-# @param X The RDD to apply the transformation.
-# @param FUN the transformation to apply on the value of each element.
-# @return a new RDD created by the transformation.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, list(list(1, c(1,2)), list(2, c(3,4))))
-# collect(flatMapValues(rdd, function(x) { x }))
-# Output: list(list(1,1), list(1,2), list(2,3), list(2,4))
-#}
-# @rdname flatMapValues
-# @aliases flatMapValues,RDD,function-method
+#' Pass each value in the key-value pair RDD through a flatMap function without
+#' changing the keys; this also retains the original RDD's partitioning.
+#'
+#' The same as 'flatMapValues()' in Spark.
+#'
+#' @param X The RDD to apply the transformation.
+#' @param FUN the transformation to apply on the value of each element.
+#' @return a new RDD created by the transformation.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(list(1, c(1,2)), list(2, c(3,4))))
+#' collect(flatMapValues(rdd, function(x) { x }))
+#' Output: list(list(1,1), list(1,2), list(2,3), list(2,4))
+#'}
+#' @rdname flatMapValues
+#' @aliases flatMapValues,RDD,function-method
+#' @noRd
setMethod("flatMapValues",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
@@ -165,38 +171,34 @@ setMethod("flatMapValues",
############ Shuffle Functions ############
-# Partition an RDD by key
-#
-# This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
-# For each element of this RDD, the partitioner is used to compute a hash
-# function and the RDD is partitioned using this hash value.
-#
-# @param x The RDD to partition. Should be an RDD where each element is
-# list(K, V) or c(K, V).
-# @param numPartitions Number of partitions to create.
-# @param ... Other optional arguments to partitionBy.
-#
-# @param partitionFunc The partition function to use. Uses a default hashCode
-# function if not provided
-# @return An RDD partitioned using the specified partitioner.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
-# rdd <- parallelize(sc, pairs)
-# parts <- partitionBy(rdd, 2L)
-# collectPartition(parts, 0L) # First partition should contain list(1, 2) and list(1, 4)
-#}
-# @rdname partitionBy
-# @aliases partitionBy,RDD,integer-method
+#' Partition an RDD by key
+#'
+#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
+#' For each element of this RDD, the partitioner is used to compute a hash
+#' function and the RDD is partitioned using this hash value.
+#'
+#' @param x The RDD to partition. Should be an RDD where each element is
+#' list(K, V) or c(K, V).
+#' @param numPartitions Number of partitions to create.
+#' @param ... Other optional arguments to partitionBy.
+#'
+#' @param partitionFunc The partition function to use. Uses a default hashCode
+#' function if not provided
+#' @return An RDD partitioned using the specified partitioner.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
+#' rdd <- parallelize(sc, pairs)
+#' parts <- partitionBy(rdd, 2L)
+#' collectPartition(parts, 0L) # First partition should contain list(1, 2) and list(1, 4)
+#'}
+#' @rdname partitionBy
+#' @aliases partitionBy,RDD,integer-method
+#' @noRd
setMethod("partitionBy",
signature(x = "RDD", numPartitions = "numeric"),
function(x, numPartitions, partitionFunc = hashCode) {
-
- #if (missing(partitionFunc)) {
- # partitionFunc <- hashCode
- #}
-
partitionFunc <- cleanClosure(partitionFunc)
serializedHashFuncBytes <- serialize(partitionFunc, connection = NULL)
@@ -233,27 +235,28 @@ setMethod("partitionBy",
RDD(r, serializedMode = "byte")
})
-# Group values by key
-#
-# This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
-# and group values for each key in the RDD into a single sequence.
-#
-# @param x The RDD to group. Should be an RDD where each element is
-# list(K, V) or c(K, V).
-# @param numPartitions Number of partitions to create.
-# @return An RDD where each element is list(K, list(V))
-# @seealso reduceByKey
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
-# rdd <- parallelize(sc, pairs)
-# parts <- groupByKey(rdd, 2L)
-# grouped <- collect(parts)
-# grouped[[1]] # Should be a list(1, list(2, 4))
-#}
-# @rdname groupByKey
-# @aliases groupByKey,RDD,integer-method
+#' Group values by key
+#'
+#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
+#' and group values for each key in the RDD into a single sequence.
+#'
+#' @param x The RDD to group. Should be an RDD where each element is
+#' list(K, V) or c(K, V).
+#' @param numPartitions Number of partitions to create.
+#' @return An RDD where each element is list(K, list(V))
+#' @seealso reduceByKey
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
+#' rdd <- parallelize(sc, pairs)
+#' parts <- groupByKey(rdd, 2L)
+#' grouped <- collect(parts)
+#' grouped[[1]] # Should be a list(1, list(2, 4))
+#'}
+#' @rdname groupByKey
+#' @aliases groupByKey,RDD,integer-method
+#' @noRd
setMethod("groupByKey",
signature(x = "RDD", numPartitions = "numeric"),
function(x, numPartitions) {
@@ -291,28 +294,29 @@ setMethod("groupByKey",
lapplyPartition(shuffled, groupVals)
})
-# Merge values by key
-#
-# This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
-# and merges the values for each key using an associative reduce function.
-#
-# @param x The RDD to reduce by key. Should be an RDD where each element is
-# list(K, V) or c(K, V).
-# @param combineFunc The associative reduce function to use.
-# @param numPartitions Number of partitions to create.
-# @return An RDD where each element is list(K, V') where V' is the merged
-# value
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
-# rdd <- parallelize(sc, pairs)
-# parts <- reduceByKey(rdd, "+", 2L)
-# reduced <- collect(parts)
-# reduced[[1]] # Should be a list(1, 6)
-#}
-# @rdname reduceByKey
-# @aliases reduceByKey,RDD,integer-method
+#' Merge values by key
+#'
+#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
+#' and merges the values for each key using an associative reduce function.
+#'
+#' @param x The RDD to reduce by key. Should be an RDD where each element is
+#' list(K, V) or c(K, V).
+#' @param combineFunc The associative reduce function to use.
+#' @param numPartitions Number of partitions to create.
+#' @return An RDD where each element is list(K, V') where V' is the merged
+#' value
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
+#' rdd <- parallelize(sc, pairs)
+#' parts <- reduceByKey(rdd, "+", 2L)
+#' reduced <- collect(parts)
+#' reduced[[1]] # Should be a list(1, 6)
+#'}
+#' @rdname reduceByKey
+#' @aliases reduceByKey,RDD,integer-method
+#' @noRd
setMethod("reduceByKey",
signature(x = "RDD", combineFunc = "ANY", numPartitions = "numeric"),
function(x, combineFunc, numPartitions) {
@@ -332,27 +336,28 @@ setMethod("reduceByKey",
lapplyPartition(shuffled, reduceVals)
})
-# Merge values by key locally
-#
-# This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
-# and merges the values for each key using an associative reduce function, but return the
-# results immediately to the driver as an R list.
-#
-# @param x The RDD to reduce by key. Should be an RDD where each element is
-# list(K, V) or c(K, V).
-# @param combineFunc The associative reduce function to use.
-# @return A list of elements of type list(K, V') where V' is the merged value for each key
-# @seealso reduceByKey
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
-# rdd <- parallelize(sc, pairs)
-# reduced <- reduceByKeyLocally(rdd, "+")
-# reduced # list(list(1, 6), list(1.1, 3))
-#}
-# @rdname reduceByKeyLocally
-# @aliases reduceByKeyLocally,RDD,integer-method
+#' Merge values by key locally
+#'
+#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
+#' and merges the values for each key using an associative reduce function, but return the
+#' results immediately to the driver as an R list.
+#'
+#' @param x The RDD to reduce by key. Should be an RDD where each element is
+#' list(K, V) or c(K, V).
+#' @param combineFunc The associative reduce function to use.
+#' @return A list of elements of type list(K, V') where V' is the merged value for each key
+#' @seealso reduceByKey
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
+#' rdd <- parallelize(sc, pairs)
+#' reduced <- reduceByKeyLocally(rdd, "+")
+#' reduced # list(list(1, 6), list(1.1, 3))
+#'}
+#' @rdname reduceByKeyLocally
+#' @aliases reduceByKeyLocally,RDD,integer-method
+#' @noRd
setMethod("reduceByKeyLocally",
signature(x = "RDD", combineFunc = "ANY"),
function(x, combineFunc) {
@@ -384,41 +389,40 @@ setMethod("reduceByKeyLocally",
convertEnvsToList(merged[[1]], merged[[2]])
})
-# Combine values by key
-#
-# Generic function to combine the elements for each key using a custom set of
-# aggregation functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)],
-# for a "combined type" C. Note that V and C can be different -- for example, one
-# might group an RDD of type (Int, Int) into an RDD of type (Int, Seq[Int]).
-
-# Users provide three functions:
-# \itemize{
-# \item createCombiner, which turns a V into a C (e.g., creates a one-element list)
-# \item mergeValue, to merge a V into a C (e.g., adds it to the end of a list) -
-# \item mergeCombiners, to combine two C's into a single one (e.g., concatentates
-# two lists).
-# }
-#
-# @param x The RDD to combine. Should be an RDD where each element is
-# list(K, V) or c(K, V).
-# @param createCombiner Create a combiner (C) given a value (V)
-# @param mergeValue Merge the given value (V) with an existing combiner (C)
-# @param mergeCombiners Merge two combiners and return a new combiner
-# @param numPartitions Number of partitions to create.
-# @return An RDD where each element is list(K, C) where C is the combined type
-#
-# @seealso groupByKey, reduceByKey
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
-# rdd <- parallelize(sc, pairs)
-# parts <- combineByKey(rdd, function(x) { x }, "+", "+", 2L)
-# combined <- collect(parts)
-# combined[[1]] # Should be a list(1, 6)
-#}
-# @rdname combineByKey
-# @aliases combineByKey,RDD,ANY,ANY,ANY,integer-method
+#' Combine values by key
+#'
+#' Generic function to combine the elements for each key using a custom set of
+#' aggregation functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)],
+#' for a "combined type" C. Note that V and C can be different -- for example, one
+#' might group an RDD of type (Int, Int) into an RDD of type (Int, Seq[Int]).
+#' Users provide three functions:
+#' \itemize{
+#' \item createCombiner, which turns a V into a C (e.g., creates a one-element list)
+#' \item mergeValue, to merge a V into a C (e.g., adds it to the end of a list) -
+#' \item mergeCombiners, to combine two C's into a single one (e.g., concatentates
+#' two lists).
+#' }
+#'
+#' @param x The RDD to combine. Should be an RDD where each element is
+#' list(K, V) or c(K, V).
+#' @param createCombiner Create a combiner (C) given a value (V)
+#' @param mergeValue Merge the given value (V) with an existing combiner (C)
+#' @param mergeCombiners Merge two combiners and return a new combiner
+#' @param numPartitions Number of partitions to create.
+#' @return An RDD where each element is list(K, C) where C is the combined type
+#' @seealso groupByKey, reduceByKey
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
+#' rdd <- parallelize(sc, pairs)
+#' parts <- combineByKey(rdd, function(x) { x }, "+", "+", 2L)
+#' combined <- collect(parts)
+#' combined[[1]] # Should be a list(1, 6)
+#'}
+#' @rdname combineByKey
+#' @aliases combineByKey,RDD,ANY,ANY,ANY,integer-method
+#' @noRd
setMethod("combineByKey",
signature(x = "RDD", createCombiner = "ANY", mergeValue = "ANY",
mergeCombiners = "ANY", numPartitions = "numeric"),
@@ -450,36 +454,37 @@ setMethod("combineByKey",
lapplyPartition(shuffled, mergeAfterShuffle)
})
-# Aggregate a pair RDD by each key.
-#
-# Aggregate the values of each key in an RDD, using given combine functions
-# and a neutral "zero value". This function can return a different result type,
-# U, than the type of the values in this RDD, V. Thus, we need one operation
-# for merging a V into a U and one operation for merging two U's, The former
-# operation is used for merging values within a partition, and the latter is
-# used for merging values between partitions. To avoid memory allocation, both
-# of these functions are allowed to modify and return their first argument
-# instead of creating a new U.
-#
-# @param x An RDD.
-# @param zeroValue A neutral "zero value".
-# @param seqOp A function to aggregate the values of each key. It may return
-# a different result type from the type of the values.
-# @param combOp A function to aggregate results of seqOp.
-# @return An RDD containing the aggregation result.
-# @seealso foldByKey, combineByKey
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
-# zeroValue <- list(0, 0)
-# seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
-# combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
-# aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
-# # list(list(1, list(3, 2)), list(2, list(7, 2)))
-#}
-# @rdname aggregateByKey
-# @aliases aggregateByKey,RDD,ANY,ANY,ANY,integer-method
+#' Aggregate a pair RDD by each key.
+#'
+#' Aggregate the values of each key in an RDD, using given combine functions
+#' and a neutral "zero value". This function can return a different result type,
+#' U, than the type of the values in this RDD, V. Thus, we need one operation
+#' for merging a V into a U and one operation for merging two U's, The former
+#' operation is used for merging values within a partition, and the latter is
+#' used for merging values between partitions. To avoid memory allocation, both
+#' of these functions are allowed to modify and return their first argument
+#' instead of creating a new U.
+#'
+#' @param x An RDD.
+#' @param zeroValue A neutral "zero value".
+#' @param seqOp A function to aggregate the values of each key. It may return
+#' a different result type from the type of the values.
+#' @param combOp A function to aggregate results of seqOp.
+#' @return An RDD containing the aggregation result.
+#' @seealso foldByKey, combineByKey
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
+#' zeroValue <- list(0, 0)
+#' seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
+#' combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
+#' aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
+#' # list(list(1, list(3, 2)), list(2, list(7, 2)))
+#'}
+#' @rdname aggregateByKey
+#' @aliases aggregateByKey,RDD,ANY,ANY,ANY,integer-method
+#' @noRd
setMethod("aggregateByKey",
signature(x = "RDD", zeroValue = "ANY", seqOp = "ANY",
combOp = "ANY", numPartitions = "numeric"),
@@ -491,26 +496,27 @@ setMethod("aggregateByKey",
combineByKey(x, createCombiner, seqOp, combOp, numPartitions)
})
-# Fold a pair RDD by each key.
-#
-# Aggregate the values of each key in an RDD, using an associative function "func"
-# and a neutral "zero value" which may be added to the result an arbitrary
-# number of times, and must not change the result (e.g., 0 for addition, or
-# 1 for multiplication.).
-#
-# @param x An RDD.
-# @param zeroValue A neutral "zero value".
-# @param func An associative function for folding values of each key.
-# @return An RDD containing the aggregation result.
-# @seealso aggregateByKey, combineByKey
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
-# foldByKey(rdd, 0, "+", 2L) # list(list(1, 3), list(2, 7))
-#}
-# @rdname foldByKey
-# @aliases foldByKey,RDD,ANY,ANY,integer-method
+#' Fold a pair RDD by each key.
+#'
+#' Aggregate the values of each key in an RDD, using an associative function "func"
+#' and a neutral "zero value" which may be added to the result an arbitrary
+#' number of times, and must not change the result (e.g., 0 for addition, or
+#' 1 for multiplication.).
+#'
+#' @param x An RDD.
+#' @param zeroValue A neutral "zero value".
+#' @param func An associative function for folding values of each key.
+#' @return An RDD containing the aggregation result.
+#' @seealso aggregateByKey, combineByKey
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
+#' foldByKey(rdd, 0, "+", 2L) # list(list(1, 3), list(2, 7))
+#'}
+#' @rdname foldByKey
+#' @aliases foldByKey,RDD,ANY,ANY,integer-method
+#' @noRd
setMethod("foldByKey",
signature(x = "RDD", zeroValue = "ANY",
func = "ANY", numPartitions = "numeric"),
@@ -520,28 +526,29 @@ setMethod("foldByKey",
############ Binary Functions #############
-# Join two RDDs
-#
-# @description
-# \code{join} This function joins two RDDs where every element is of the form list(K, V).
-# The key types of the two RDDs should be the same.
-#
-# @param x An RDD to be joined. Should be an RDD where each element is
-# list(K, V).
-# @param y An RDD to be joined. Should be an RDD where each element is
-# list(K, V).
-# @param numPartitions Number of partitions to create.
-# @return a new RDD containing all pairs of elements with matching keys in
-# two input RDDs.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
-# rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
-# join(rdd1, rdd2, 2L) # list(list(1, list(1, 2)), list(1, list(1, 3))
-#}
-# @rdname join-methods
-# @aliases join,RDD,RDD-method
+#' Join two RDDs
+#'
+#' @description
+#' \code{join} This function joins two RDDs where every element is of the form list(K, V).
+#' The key types of the two RDDs should be the same.
+#'
+#' @param x An RDD to be joined. Should be an RDD where each element is
+#' list(K, V).
+#' @param y An RDD to be joined. Should be an RDD where each element is
+#' list(K, V).
+#' @param numPartitions Number of partitions to create.
+#' @return a new RDD containing all pairs of elements with matching keys in
+#' two input RDDs.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
+#' rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
+#' join(rdd1, rdd2, 2L) # list(list(1, list(1, 2)), list(1, list(1, 3))
+#'}
+#' @rdname join-methods
+#' @aliases join,RDD,RDD-method
+#' @noRd
setMethod("join",
signature(x = "RDD", y = "RDD"),
function(x, y, numPartitions) {
@@ -556,30 +563,31 @@ setMethod("join",
doJoin)
})
-# Left outer join two RDDs
-#
-# @description
-# \code{leftouterjoin} This function left-outer-joins two RDDs where every element is of
-# the form list(K, V). The key types of the two RDDs should be the same.
-#
-# @param x An RDD to be joined. Should be an RDD where each element is
-# list(K, V).
-# @param y An RDD to be joined. Should be an RDD where each element is
-# list(K, V).
-# @param numPartitions Number of partitions to create.
-# @return For each element (k, v) in x, the resulting RDD will either contain
-# all pairs (k, (v, w)) for (k, w) in rdd2, or the pair (k, (v, NULL))
-# if no elements in rdd2 have key k.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
-# rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
-# leftOuterJoin(rdd1, rdd2, 2L)
-# # list(list(1, list(1, 2)), list(1, list(1, 3)), list(2, list(4, NULL)))
-#}
-# @rdname join-methods
-# @aliases leftOuterJoin,RDD,RDD-method
+#' Left outer join two RDDs
+#'
+#' @description
+#' \code{leftouterjoin} This function left-outer-joins two RDDs where every element is of
+#' the form list(K, V). The key types of the two RDDs should be the same.
+#'
+#' @param x An RDD to be joined. Should be an RDD where each element is
+#' list(K, V).
+#' @param y An RDD to be joined. Should be an RDD where each element is
+#' list(K, V).
+#' @param numPartitions Number of partitions to create.
+#' @return For each element (k, v) in x, the resulting RDD will either contain
+#' all pairs (k, (v, w)) for (k, w) in rdd2, or the pair (k, (v, NULL))
+#' if no elements in rdd2 have key k.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
+#' rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
+#' leftOuterJoin(rdd1, rdd2, 2L)
+#' # list(list(1, list(1, 2)), list(1, list(1, 3)), list(2, list(4, NULL)))
+#'}
+#' @rdname join-methods
+#' @aliases leftOuterJoin,RDD,RDD-method
+#' @noRd
setMethod("leftOuterJoin",
signature(x = "RDD", y = "RDD", numPartitions = "numeric"),
function(x, y, numPartitions) {
@@ -593,30 +601,31 @@ setMethod("leftOuterJoin",
joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin)
})
-# Right outer join two RDDs
-#
-# @description
-# \code{rightouterjoin} This function right-outer-joins two RDDs where every element is of
-# the form list(K, V). The key types of the two RDDs should be the same.
-#
-# @param x An RDD to be joined. Should be an RDD where each element is
-# list(K, V).
-# @param y An RDD to be joined. Should be an RDD where each element is
-# list(K, V).
-# @param numPartitions Number of partitions to create.
-# @return For each element (k, w) in y, the resulting RDD will either contain
-# all pairs (k, (v, w)) for (k, v) in x, or the pair (k, (NULL, w))
-# if no elements in x have key k.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3)))
-# rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
-# rightOuterJoin(rdd1, rdd2, 2L)
-# # list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4)))
-#}
-# @rdname join-methods
-# @aliases rightOuterJoin,RDD,RDD-method
+#' Right outer join two RDDs
+#'
+#' @description
+#' \code{rightouterjoin} This function right-outer-joins two RDDs where every element is of
+#' the form list(K, V). The key types of the two RDDs should be the same.
+#'
+#' @param x An RDD to be joined. Should be an RDD where each element is
+#' list(K, V).
+#' @param y An RDD to be joined. Should be an RDD where each element is
+#' list(K, V).
+#' @param numPartitions Number of partitions to create.
+#' @return For each element (k, w) in y, the resulting RDD will either contain
+#' all pairs (k, (v, w)) for (k, v) in x, or the pair (k, (NULL, w))
+#' if no elements in x have key k.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3)))
+#' rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
+#' rightOuterJoin(rdd1, rdd2, 2L)
+#' # list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4)))
+#'}
+#' @rdname join-methods
+#' @aliases rightOuterJoin,RDD,RDD-method
+#' @noRd
setMethod("rightOuterJoin",
signature(x = "RDD", y = "RDD", numPartitions = "numeric"),
function(x, y, numPartitions) {
@@ -630,33 +639,34 @@ setMethod("rightOuterJoin",
joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin)
})
-# Full outer join two RDDs
-#
-# @description
-# \code{fullouterjoin} This function full-outer-joins two RDDs where every element is of
-# the form list(K, V). The key types of the two RDDs should be the same.
-#
-# @param x An RDD to be joined. Should be an RDD where each element is
-# list(K, V).
-# @param y An RDD to be joined. Should be an RDD where each element is
-# list(K, V).
-# @param numPartitions Number of partitions to create.
-# @return For each element (k, v) in x and (k, w) in y, the resulting RDD
-# will contain all pairs (k, (v, w)) for both (k, v) in x and
-# (k, w) in y, or the pair (k, (NULL, w))/(k, (v, NULL)) if no elements
-# in x/y have key k.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3), list(3, 3)))
-# rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
-# fullOuterJoin(rdd1, rdd2, 2L) # list(list(1, list(2, 1)),
-# # list(1, list(3, 1)),
-# # list(2, list(NULL, 4)))
-# # list(3, list(3, NULL)),
-#}
-# @rdname join-methods
-# @aliases fullOuterJoin,RDD,RDD-method
+#' Full outer join two RDDs
+#'
+#' @description
+#' \code{fullouterjoin} This function full-outer-joins two RDDs where every element is of
+#' the form list(K, V). The key types of the two RDDs should be the same.
+#'
+#' @param x An RDD to be joined. Should be an RDD where each element is
+#' list(K, V).
+#' @param y An RDD to be joined. Should be an RDD where each element is
+#' list(K, V).
+#' @param numPartitions Number of partitions to create.
+#' @return For each element (k, v) in x and (k, w) in y, the resulting RDD
+#' will contain all pairs (k, (v, w)) for both (k, v) in x and
+#' (k, w) in y, or the pair (k, (NULL, w))/(k, (v, NULL)) if no elements
+#' in x/y have key k.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3), list(3, 3)))
+#' rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
+#' fullOuterJoin(rdd1, rdd2, 2L) # list(list(1, list(2, 1)),
+#' # list(1, list(3, 1)),
+#' # list(2, list(NULL, 4)))
+#' # list(3, list(3, NULL)),
+#'}
+#' @rdname join-methods
+#' @aliases fullOuterJoin,RDD,RDD-method
+#' @noRd
setMethod("fullOuterJoin",
signature(x = "RDD", y = "RDD", numPartitions = "numeric"),
function(x, y, numPartitions) {
@@ -670,23 +680,24 @@ setMethod("fullOuterJoin",
joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin)
})
-# For each key k in several RDDs, return a resulting RDD that
-# whose values are a list of values for the key in all RDDs.
-#
-# @param ... Several RDDs.
-# @param numPartitions Number of partitions to create.
-# @return a new RDD containing all pairs of elements with values in a list
-# in all RDDs.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
-# rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
-# cogroup(rdd1, rdd2, numPartitions = 2L)
-# # list(list(1, list(1, list(2, 3))), list(2, list(list(4), list()))
-#}
-# @rdname cogroup
-# @aliases cogroup,RDD-method
+#' For each key k in several RDDs, return a resulting RDD that
+#' whose values are a list of values for the key in all RDDs.
+#'
+#' @param ... Several RDDs.
+#' @param numPartitions Number of partitions to create.
+#' @return a new RDD containing all pairs of elements with values in a list
+#' in all RDDs.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
+#' rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
+#' cogroup(rdd1, rdd2, numPartitions = 2L)
+#' # list(list(1, list(1, list(2, 3))), list(2, list(list(4), list()))
+#'}
+#' @rdname cogroup
+#' @aliases cogroup,RDD-method
+#' @noRd
setMethod("cogroup",
"RDD",
function(..., numPartitions) {
@@ -722,20 +733,21 @@ setMethod("cogroup",
group.func)
})
-# Sort a (k, v) pair RDD by k.
-#
-# @param x A (k, v) pair RDD to be sorted.
-# @param ascending A flag to indicate whether the sorting is ascending or descending.
-# @param numPartitions Number of partitions to create.
-# @return An RDD where all (k, v) pair elements are sorted.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, list(list(3, 1), list(2, 2), list(1, 3)))
-# collect(sortByKey(rdd)) # list (list(1, 3), list(2, 2), list(3, 1))
-#}
-# @rdname sortByKey
-# @aliases sortByKey,RDD,RDD-method
+#' Sort a (k, v) pair RDD by k.
+#'
+#' @param x A (k, v) pair RDD to be sorted.
+#' @param ascending A flag to indicate whether the sorting is ascending or descending.
+#' @param numPartitions Number of partitions to create.
+#' @return An RDD where all (k, v) pair elements are sorted.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(list(3, 1), list(2, 2), list(1, 3)))
+#' collect(sortByKey(rdd)) # list (list(1, 3), list(2, 2), list(3, 1))
+#'}
+#' @rdname sortByKey
+#' @aliases sortByKey,RDD,RDD-method
+#' @noRd
setMethod("sortByKey",
signature(x = "RDD"),
function(x, ascending = TRUE, numPartitions = SparkR:::numPartitions(x)) {
@@ -784,25 +796,26 @@ setMethod("sortByKey",
lapplyPartition(newRDD, partitionFunc)
})
-# Subtract a pair RDD with another pair RDD.
-#
-# Return an RDD with the pairs from x whose keys are not in other.
-#
-# @param x An RDD.
-# @param other An RDD.
-# @param numPartitions Number of the partitions in the result RDD.
-# @return An RDD with the pairs from x whose keys are not in other.
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd1 <- parallelize(sc, list(list("a", 1), list("b", 4),
-# list("b", 5), list("a", 2)))
-# rdd2 <- parallelize(sc, list(list("a", 3), list("c", 1)))
-# collect(subtractByKey(rdd1, rdd2))
-# # list(list("b", 4), list("b", 5))
-#}
-# @rdname subtractByKey
-# @aliases subtractByKey,RDD
+#' Subtract a pair RDD with another pair RDD.
+#'
+#' Return an RDD with the pairs from x whose keys are not in other.
+#'
+#' @param x An RDD.
+#' @param other An RDD.
+#' @param numPartitions Number of the partitions in the result RDD.
+#' @return An RDD with the pairs from x whose keys are not in other.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd1 <- parallelize(sc, list(list("a", 1), list("b", 4),
+#' list("b", 5), list("a", 2)))
+#' rdd2 <- parallelize(sc, list(list("a", 3), list("c", 1)))
+#' collect(subtractByKey(rdd1, rdd2))
+#' # list(list("b", 4), list("b", 5))
+#'}
+#' @rdname subtractByKey
+#' @aliases subtractByKey,RDD
+#' @noRd
setMethod("subtractByKey",
signature(x = "RDD", other = "RDD"),
function(x, other, numPartitions = SparkR:::numPartitions(x)) {
@@ -818,41 +831,42 @@ setMethod("subtractByKey",
function (v) { v[[1]] })
})
-# Return a subset of this RDD sampled by key.
-#
-# @description
-# \code{sampleByKey} Create a sample of this RDD using variable sampling rates
-# for different keys as specified by fractions, a key to sampling rate map.
-#
-# @param x The RDD to sample elements by key, where each element is
-# list(K, V) or c(K, V).
-# @param withReplacement Sampling with replacement or not
-# @param fraction The (rough) sample target fraction
-# @param seed Randomness seed value
-# @examples
-#\dontrun{
-# sc <- sparkR.init()
-# rdd <- parallelize(sc, 1:3000)
-# pairs <- lapply(rdd, function(x) { if (x %% 3 == 0) list("a", x)
-# else { if (x %% 3 == 1) list("b", x) else list("c", x) }})
-# fractions <- list(a = 0.2, b = 0.1, c = 0.3)
-# sample <- sampleByKey(pairs, FALSE, fractions, 1618L)
-# 100 < length(lookup(sample, "a")) && 300 > length(lookup(sample, "a")) # TRUE
-# 50 < length(lookup(sample, "b")) && 150 > length(lookup(sample, "b")) # TRUE
-# 200 < length(lookup(sample, "c")) && 400 > length(lookup(sample, "c")) # TRUE
-# lookup(sample, "a")[which.min(lookup(sample, "a"))] >= 0 # TRUE
-# lookup(sample, "a")[which.max(lookup(sample, "a"))] <= 2000 # TRUE
-# lookup(sample, "b")[which.min(lookup(sample, "b"))] >= 0 # TRUE
-# lookup(sample, "b")[which.max(lookup(sample, "b"))] <= 2000 # TRUE
-# lookup(sample, "c")[which.min(lookup(sample, "c"))] >= 0 # TRUE
-# lookup(sample, "c")[which.max(lookup(sample, "c"))] <= 2000 # TRUE
-# fractions <- list(a = 0.2, b = 0.1, c = 0.3, d = 0.4)
-# sample <- sampleByKey(pairs, FALSE, fractions, 1618L) # Key "d" will be ignored
-# fractions <- list(a = 0.2, b = 0.1)
-# sample <- sampleByKey(pairs, FALSE, fractions, 1618L) # KeyError: "c"
-#}
-# @rdname sampleByKey
-# @aliases sampleByKey,RDD-method
+#' Return a subset of this RDD sampled by key.
+#'
+#' @description
+#' \code{sampleByKey} Create a sample of this RDD using variable sampling rates
+#' for different keys as specified by fractions, a key to sampling rate map.
+#'
+#' @param x The RDD to sample elements by key, where each element is
+#' list(K, V) or c(K, V).
+#' @param withReplacement Sampling with replacement or not
+#' @param fraction The (rough) sample target fraction
+#' @param seed Randomness seed value
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:3000)
+#' pairs <- lapply(rdd, function(x) { if (x %% 3 == 0) list("a", x)
+#' else { if (x %% 3 == 1) list("b", x) else list("c", x) }})
+#' fractions <- list(a = 0.2, b = 0.1, c = 0.3)
+#' sample <- sampleByKey(pairs, FALSE, fractions, 1618L)
+#' 100 < length(lookup(sample, "a")) && 300 > length(lookup(sample, "a")) # TRUE
+#' 50 < length(lookup(sample, "b")) && 150 > length(lookup(sample, "b")) # TRUE
+#' 200 < length(lookup(sample, "c")) && 400 > length(lookup(sample, "c")) # TRUE
+#' lookup(sample, "a")[which.min(lookup(sample, "a"))] >= 0 # TRUE
+#' lookup(sample, "a")[which.max(lookup(sample, "a"))] <= 2000 # TRUE
+#' lookup(sample, "b")[which.min(lookup(sample, "b"))] >= 0 # TRUE
+#' lookup(sample, "b")[which.max(lookup(sample, "b"))] <= 2000 # TRUE
+#' lookup(sample, "c")[which.min(lookup(sample, "c"))] >= 0 # TRUE
+#' lookup(sample, "c")[which.max(lookup(sample, "c"))] <= 2000 # TRUE
+#' fractions <- list(a = 0.2, b = 0.1, c = 0.3, d = 0.4)
+#' sample <- sampleByKey(pairs, FALSE, fractions, 1618L) # Key "d" will be ignored
+#' fractions <- list(a = 0.2, b = 0.1)
+#' sample <- sampleByKey(pairs, FALSE, fractions, 1618L) # KeyError: "c"
+#'}
+#' @rdname sampleByKey
+#' @aliases sampleByKey,RDD-method
+#' @noRd
setMethod("sampleByKey",
signature(x = "RDD", withReplacement = "logical",
fractions = "vector", seed = "integer"),
diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R
index 004d08e74e..ebe2b2b8dc 100644
--- a/R/pkg/R/sparkR.R
+++ b/R/pkg/R/sparkR.R
@@ -34,7 +34,6 @@ connExists <- function(env) {
sparkR.stop <- function() {
env <- .sparkREnv
if (exists(".sparkRCon", envir = env)) {
- # cat("Stopping SparkR\n")
if (exists(".sparkRjsc", envir = env)) {
sc <- get(".sparkRjsc", envir = env)
callJMethod(sc, "stop")
@@ -78,7 +77,7 @@ sparkR.stop <- function() {
#' Initialize a new Spark Context.
#'
#' This function initializes a new SparkContext. For details on how to initialize
-#' and use SparkR, refer to SparkR programming guide at
+#' and use SparkR, refer to SparkR programming guide at
#' \url{http://spark.apache.org/docs/latest/sparkr.html#starting-up-sparkcontext-sqlcontext}.
#'
#' @param master The Spark master URL.
diff --git a/R/pkg/inst/profile/shell.R b/R/pkg/inst/profile/shell.R
index 7189f1a260..90a3761e41 100644
--- a/R/pkg/inst/profile/shell.R
+++ b/R/pkg/inst/profile/shell.R
@@ -38,7 +38,7 @@
if (nchar(sparkVer) == 0) {
cat("\n")
} else {
- cat(" version ", sparkVer, "\n")
+ cat(" version ", sparkVer, "\n")
}
cat(" /_/", "\n")
cat("\n")