diff options
author | Hossein <hossein@databricks.com> | 2015-06-19 15:47:22 -0700 |
---|---|---|
committer | Shivaram Venkataraman <shivaram@cs.berkeley.edu> | 2015-06-19 15:51:59 -0700 |
commit | 1fa29c2df2a7846405eed6b409b8deb5329fa7c1 (patch) | |
tree | 260a8c9c048f4004ee1729a3a77e19ed3b2bdf9d | |
parent | 54976e55e36465108b71b40b8a431be9d6d703ce (diff) | |
download | spark-1fa29c2df2a7846405eed6b409b8deb5329fa7c1.tar.gz spark-1fa29c2df2a7846405eed6b409b8deb5329fa7c1.tar.bz2 spark-1fa29c2df2a7846405eed6b409b8deb5329fa7c1.zip |
[SPARK-8452] [SPARKR] expose jobGroup API in SparkR
This pull request adds following methods to SparkR:
```R
setJobGroup()
cancelJobGroup()
clearJobGroup()
```
For each method, the spark context is passed as the first argument. There does not seem to be a good way to test these in R.
cc shivaram and davies
Author: Hossein <hossein@databricks.com>
Closes #6889 from falaki/SPARK-8452 and squashes the following commits:
9ce9f1e [Hossein] Added basic tests to verify methods can be called and won't throw errors
c706af9 [Hossein] Added examples
a2c19af [Hossein] taking spark context as first argument
343ca77 [Hossein] Added setJobGroup, cancelJobGroup and clearJobGroup to SparkR
-rw-r--r-- | R/pkg/NAMESPACE | 5 | ||||
-rw-r--r-- | R/pkg/R/sparkR.R | 44 | ||||
-rw-r--r-- | R/pkg/inst/tests/test_context.R | 7 |
3 files changed, 56 insertions, 0 deletions
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index f9447f6c32..7f85722245 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -10,6 +10,11 @@ export("sparkR.init") export("sparkR.stop") export("print.jobj") +# Job group lifecycle management methods +export("setJobGroup", + "clearJobGroup", + "cancelJobGroup") + exportClasses("DataFrame") exportMethods("arrange", diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R index 5ced7c688f..2efd4f0742 100644 --- a/R/pkg/R/sparkR.R +++ b/R/pkg/R/sparkR.R @@ -278,3 +278,47 @@ sparkRHive.init <- function(jsc = NULL) { assign(".sparkRHivesc", hiveCtx, envir = .sparkREnv) hiveCtx } + +#' Assigns a group ID to all the jobs started by this thread until the group ID is set to a +#' different value or cleared. +#' +#' @param sc existing spark context +#' @param groupid the ID to be assigned to job groups +#' @param description description for the the job group ID +#' @param interruptOnCancel flag to indicate if the job is interrupted on job cancellation +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' setJobGroup(sc, "myJobGroup", "My job group description", TRUE) +#'} + +setJobGroup <- function(sc, groupId, description, interruptOnCancel) { + callJMethod(sc, "setJobGroup", groupId, description, interruptOnCancel) +} + +#' Clear current job group ID and its description +#' +#' @param sc existing spark context +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' clearJobGroup(sc) +#'} + +clearJobGroup <- function(sc) { + callJMethod(sc, "clearJobGroup") +} + +#' Cancel active jobs for the specified group +#' +#' @param sc existing spark context +#' @param groupId the ID of job group to be cancelled +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' cancelJobGroup(sc, "myJobGroup") +#'} + +cancelJobGroup <- function(sc, groupId) { + callJMethod(sc, "cancelJobGroup", groupId) +} diff --git a/R/pkg/inst/tests/test_context.R b/R/pkg/inst/tests/test_context.R index e4aab37436..513bbc8e62 100644 --- a/R/pkg/inst/tests/test_context.R +++ b/R/pkg/inst/tests/test_context.R @@ -48,3 +48,10 @@ test_that("rdd GC across sparkR.stop", { count(rdd3) count(rdd4) }) + +test_that("job group functions can be called", { + sc <- sparkR.init() + setJobGroup(sc, "groupId", "job description", TRUE) + cancelJobGroup(sc, "groupId") + clearJobGroup(sc) +}) |