aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--R/pkg/R/context.R2
-rw-r--r--docs/sparkr.md77
2 files changed, 78 insertions, 1 deletions
diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R
index 96ef9438ad..dd0ceaeb08 100644
--- a/R/pkg/R/context.R
+++ b/R/pkg/R/context.R
@@ -246,7 +246,7 @@ setCheckpointDir <- function(sc, dirName) {
#' \preformatted{
#' train <- function(hyperparam) {
#' library(MASS)
-#' lm.ridge(“y ~ x+z”, data, lambda=hyperparam)
+#' lm.ridge("y ~ x+z", data, lambda=hyperparam)
#' model
#' }
#' }
diff --git a/docs/sparkr.md b/docs/sparkr.md
index f0189012f3..9e74e4a96a 100644
--- a/docs/sparkr.md
+++ b/docs/sparkr.md
@@ -255,6 +255,83 @@ head(df)
{% endhighlight %}
</div>
+### Applying User-Defined Function
+In SparkR, we support several kinds of User-Defined Functions:
+
+#### Run a given function on a large dataset using `dapply` or `dapplyCollect`
+
+##### dapply
+Apply a function to each partition of a `SparkDataFrame`. The function to be applied to each partition of the `SparkDataFrame`
+and should have only one parameter, to which a `data.frame` corresponds to each partition will be passed. The output of function
+should be a `data.frame`. Schema specifies the row format of the resulting a `SparkDataFrame`. It must match the R function's output.
+<div data-lang="r" markdown="1">
+{% highlight r %}
+
+# Convert waiting time from hours to seconds.
+# Note that we can apply UDF to DataFrame.
+schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
+ structField("waiting_secs", "double"))
+df1 <- dapply(df, function(x) {x <- cbind(x, x$waiting * 60)}, schema)
+head(collect(df1))
+## eruptions waiting waiting_secs
+##1 3.600 79 4740
+##2 1.800 54 3240
+##3 3.333 74 4440
+##4 2.283 62 3720
+##5 4.533 85 5100
+##6 2.883 55 3300
+{% endhighlight %}
+</div>
+
+##### dapplyCollect
+Like `dapply`, apply a function to each partition of a `SparkDataFrame` and collect the result back. The output of function
+should be a `data.frame`. But, Schema is not required to be passed. Note that `dapplyCollect` only can be used if the
+output of UDF run on all the partitions can fit in driver memory.
+<div data-lang="r" markdown="1">
+{% highlight r %}
+
+# Convert waiting time from hours to seconds.
+# Note that we can apply UDF to DataFrame and return a R's data.frame
+ldf <- dapplyCollect(
+ df,
+ function(x) {
+ x <- cbind(x, "waiting_secs" = x$waiting * 60)
+ })
+head(ldf, 3)
+## eruptions waiting waiting_secs
+##1 3.600 79 4740
+##2 1.800 54 3240
+##3 3.333 74 4440
+
+{% endhighlight %}
+</div>
+
+#### Run local R functions distributed using `spark.lapply`
+
+##### spark.lapply
+Similar to `lapply` in native R, `spark.lapply` runs a function over a list of elements and distributes the computations with Spark.
+Applies a function in a manner that is similar to `doParallel` or `lapply` to elements of a list. The results of all the computations
+should fit in a single machine. If that is not the case they can do something like `df <- createDataFrame(list)` and then use
+`dapply`
+<div data-lang="r" markdown="1">
+{% highlight r %}
+
+# Perform distributed training of multiple models with spark.lapply. Here, we pass
+# a read-only list of arguments which specifies family the generalized linear model should be.
+families <- c("gaussian", "poisson")
+train <- function(family) {
+ model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
+ summary(model)
+}
+# Return a list of model's summaries
+model.summaries <- spark.lapply(families, train)
+
+# Print the summary of each model
+print(model.summaries)
+
+{% endhighlight %}
+</div>
+
## Running SQL Queries from SparkR
A SparkDataFrame can also be registered as a temporary view in Spark SQL and that allows you to run SQL queries over its data.
The `sql` function enables applications to run SQL queries programmatically and returns the result as a `SparkDataFrame`.