aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTimothy Hunter <timhunter@databricks.com>2016-04-28 22:42:48 -0700
committerXiangrui Meng <meng@databricks.com>2016-04-28 22:42:48 -0700
commit769a909d1357766a441ff69e6e98c22c51b12c93 (patch)
treed176f05a13eec69224cf9e084706dd4fac9da1e8
parent4607f6e7f7b174c62700f1fe542f77af3203b096 (diff)
downloadspark-769a909d1357766a441ff69e6e98c22c51b12c93.tar.gz
spark-769a909d1357766a441ff69e6e98c22c51b12c93.tar.bz2
spark-769a909d1357766a441ff69e6e98c22c51b12c93.zip
[SPARK-7264][ML] Parallel lapply for sparkR
## What changes were proposed in this pull request? This PR adds a new function in SparkR called `sparkLapply(list, function)`. This function implements a distributed version of `lapply` using Spark as a backend. TODO: - [x] check documentation - [ ] check tests Trivial example in SparkR: ```R sparkLapply(1:5, function(x) { 2 * x }) ``` Output: ``` [[1]] [1] 2 [[2]] [1] 4 [[3]] [1] 6 [[4]] [1] 8 [[5]] [1] 10 ``` Here is a slightly more complex example to perform distributed training of multiple models. Under the hood, Spark broadcasts the dataset. ```R library("MASS") data(menarche) families <- c("gaussian", "poisson") train <- function(family){glm(Menarche ~ Age , family=family, data=menarche)} results <- sparkLapply(families, train) ``` ## How was this patch tested? This PR was tested in SparkR. I am unfamiliar with R and SparkR, so any feedback on style, testing, etc. will be much appreciated. cc falaki davies Author: Timothy Hunter <timhunter@databricks.com> Closes #12426 from thunterdb/7264.
-rw-r--r--R/pkg/NAMESPACE1
-rw-r--r--R/pkg/R/context.R42
-rw-r--r--R/pkg/inst/tests/testthat/test_context.R6
3 files changed, 49 insertions, 0 deletions
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index ea31baed3d..002e469efb 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -295,6 +295,7 @@ export("as.DataFrame",
"read.json",
"read.parquet",
"read.text",
+ "spark.lapply",
"sql",
"str",
"tableToDF",
diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R
index 4105a6e5c8..44bca877fd 100644
--- a/R/pkg/R/context.R
+++ b/R/pkg/R/context.R
@@ -226,6 +226,48 @@ setCheckpointDir <- function(sc, dirName) {
invisible(callJMethod(sc, "setCheckpointDir", suppressWarnings(normalizePath(dirName))))
}
+#' @title Run a function over a list of elements, distributing the computations with Spark.
+#'
+#' @description
+#' Applies a function in a manner that is similar to doParallel or lapply to elements of a list.
+#' The computations are distributed using Spark. It is conceptually the same as the following code:
+#' lapply(list, func)
+#'
+#' Known limitations:
+#' - variable scoping and capture: compared to R's rich support for variable resolutions, the
+# distributed nature of SparkR limits how variables are resolved at runtime. All the variables
+# that are available through lexical scoping are embedded in the closure of the function and
+# available as read-only variables within the function. The environment variables should be
+# stored into temporary variables outside the function, and not directly accessed within the
+# function.
+#'
+#' - loading external packages: In order to use a package, you need to load it inside the
+#' closure. For example, if you rely on the MASS module, here is how you would use it:
+#'\dontrun{
+#' train <- function(hyperparam) {
+#' library(MASS)
+#' lm.ridge(“y ~ x+z”, data, lambda=hyperparam)
+#' model
+#' }
+#'}
+#'
+#' @rdname spark.lapply
+#' @param sc Spark Context to use
+#' @param list the list of elements
+#' @param func a function that takes one argument.
+#' @return a list of results (the exact type being determined by the function)
+#' @export
+#' @examples
+#'\dontrun{
+#' doubled <- spark.lapply(1:10, function(x){2 * x})
+#'}
+spark.lapply <- function(sc, list, func) {
+ rdd <- parallelize(sc, list, length(list))
+ results <- map(rdd, func)
+ local <- collect(results)
+ local
+}
+
#' Set new log level
#'
#' Set new log level: "ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", "TRACE", "WARN"
diff --git a/R/pkg/inst/tests/testthat/test_context.R b/R/pkg/inst/tests/testthat/test_context.R
index ffa067eb5e..ca04342cd5 100644
--- a/R/pkg/inst/tests/testthat/test_context.R
+++ b/R/pkg/inst/tests/testthat/test_context.R
@@ -141,3 +141,9 @@ test_that("sparkJars sparkPackages as comma-separated strings", {
expect_that(processSparkJars(f), not(gives_warning()))
expect_match(processSparkJars(f), f)
})
+
+test_that("spark.lapply should perform simple transforms", {
+ sc <- sparkR.init()
+ doubled <- spark.lapply(sc, 1:10, function(x) { 2 * x })
+ expect_equal(doubled, as.list(2 * 1:10))
+})