From 43b04b7ecb313a2cee6121dd575de1f7dc785c11 Mon Sep 17 00:00:00 2001 From: Kai Jiang Date: Wed, 22 Jun 2016 12:50:36 -0700 Subject: [SPARK-15672][R][DOC] R programming guide update ## What changes were proposed in this pull request? Guide for - UDFs with dapply, dapplyCollect - spark.lapply for running parallel R functions ## How was this patch tested? build locally screen shot 2016-06-14 at 03 12 56 Author: Kai Jiang Closes #13660 from vectorijk/spark-15672-R-guide-update. --- docs/sparkr.md | 77 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) (limited to 'docs') diff --git a/docs/sparkr.md b/docs/sparkr.md index f0189012f3..9e74e4a96a 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -255,6 +255,83 @@ head(df) {% endhighlight %} +### Applying User-Defined Function +In SparkR, we support several kinds of User-Defined Functions: + +#### Run a given function on a large dataset using `dapply` or `dapplyCollect` + +##### dapply +Apply a function to each partition of a `SparkDataFrame`. The function to be applied to each partition of the `SparkDataFrame` +and should have only one parameter, to which a `data.frame` corresponds to each partition will be passed. The output of function +should be a `data.frame`. Schema specifies the row format of the resulting a `SparkDataFrame`. It must match the R function's output. +
+{% highlight r %} + +# Convert waiting time from hours to seconds. +# Note that we can apply UDF to DataFrame. +schema <- structType(structField("eruptions", "double"), structField("waiting", "double"), + structField("waiting_secs", "double")) +df1 <- dapply(df, function(x) {x <- cbind(x, x$waiting * 60)}, schema) +head(collect(df1)) +## eruptions waiting waiting_secs +##1 3.600 79 4740 +##2 1.800 54 3240 +##3 3.333 74 4440 +##4 2.283 62 3720 +##5 4.533 85 5100 +##6 2.883 55 3300 +{% endhighlight %} +
+ +##### dapplyCollect +Like `dapply`, apply a function to each partition of a `SparkDataFrame` and collect the result back. The output of function +should be a `data.frame`. But, Schema is not required to be passed. Note that `dapplyCollect` only can be used if the +output of UDF run on all the partitions can fit in driver memory. +
+{% highlight r %} + +# Convert waiting time from hours to seconds. +# Note that we can apply UDF to DataFrame and return a R's data.frame +ldf <- dapplyCollect( + df, + function(x) { + x <- cbind(x, "waiting_secs" = x$waiting * 60) + }) +head(ldf, 3) +## eruptions waiting waiting_secs +##1 3.600 79 4740 +##2 1.800 54 3240 +##3 3.333 74 4440 + +{% endhighlight %} +
+ +#### Run local R functions distributed using `spark.lapply` + +##### spark.lapply +Similar to `lapply` in native R, `spark.lapply` runs a function over a list of elements and distributes the computations with Spark. +Applies a function in a manner that is similar to `doParallel` or `lapply` to elements of a list. The results of all the computations +should fit in a single machine. If that is not the case they can do something like `df <- createDataFrame(list)` and then use +`dapply` +
+{% highlight r %} + +# Perform distributed training of multiple models with spark.lapply. Here, we pass +# a read-only list of arguments which specifies family the generalized linear model should be. +families <- c("gaussian", "poisson") +train <- function(family) { + model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family) + summary(model) +} +# Return a list of model's summaries +model.summaries <- spark.lapply(families, train) + +# Print the summary of each model +print(model.summaries) + +{% endhighlight %} +
+ ## Running SQL Queries from SparkR A SparkDataFrame can also be registered as a temporary view in Spark SQL and that allows you to run SQL queries over its data. The `sql` function enables applications to run SQL queries programmatically and returns the result as a `SparkDataFrame`. -- cgit v1.2.3