aboutsummaryrefslogtreecommitdiff
path: root/R
diff options
context:
space:
mode:
authorHossein <hossein@databricks.com>2015-06-19 15:47:22 -0700
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2015-06-19 15:51:59 -0700
commit1fa29c2df2a7846405eed6b409b8deb5329fa7c1 (patch)
tree260a8c9c048f4004ee1729a3a77e19ed3b2bdf9d /R
parent54976e55e36465108b71b40b8a431be9d6d703ce (diff)
downloadspark-1fa29c2df2a7846405eed6b409b8deb5329fa7c1.tar.gz
spark-1fa29c2df2a7846405eed6b409b8deb5329fa7c1.tar.bz2
spark-1fa29c2df2a7846405eed6b409b8deb5329fa7c1.zip
[SPARK-8452] [SPARKR] expose jobGroup API in SparkR
This pull request adds following methods to SparkR: ```R setJobGroup() cancelJobGroup() clearJobGroup() ``` For each method, the spark context is passed as the first argument. There does not seem to be a good way to test these in R. cc shivaram and davies Author: Hossein <hossein@databricks.com> Closes #6889 from falaki/SPARK-8452 and squashes the following commits: 9ce9f1e [Hossein] Added basic tests to verify methods can be called and won't throw errors c706af9 [Hossein] Added examples a2c19af [Hossein] taking spark context as first argument 343ca77 [Hossein] Added setJobGroup, cancelJobGroup and clearJobGroup to SparkR
Diffstat (limited to 'R')
-rw-r--r--R/pkg/NAMESPACE5
-rw-r--r--R/pkg/R/sparkR.R44
-rw-r--r--R/pkg/inst/tests/test_context.R7
3 files changed, 56 insertions, 0 deletions
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index f9447f6c32..7f85722245 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -10,6 +10,11 @@ export("sparkR.init")
export("sparkR.stop")
export("print.jobj")
+# Job group lifecycle management methods
+export("setJobGroup",
+ "clearJobGroup",
+ "cancelJobGroup")
+
exportClasses("DataFrame")
exportMethods("arrange",
diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R
index 5ced7c688f..2efd4f0742 100644
--- a/R/pkg/R/sparkR.R
+++ b/R/pkg/R/sparkR.R
@@ -278,3 +278,47 @@ sparkRHive.init <- function(jsc = NULL) {
assign(".sparkRHivesc", hiveCtx, envir = .sparkREnv)
hiveCtx
}
+
+#' Assigns a group ID to all the jobs started by this thread until the group ID is set to a
+#' different value or cleared.
+#'
+#' @param sc existing spark context
+#' @param groupid the ID to be assigned to job groups
+#' @param description description for the the job group ID
+#' @param interruptOnCancel flag to indicate if the job is interrupted on job cancellation
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' setJobGroup(sc, "myJobGroup", "My job group description", TRUE)
+#'}
+
+setJobGroup <- function(sc, groupId, description, interruptOnCancel) {
+ callJMethod(sc, "setJobGroup", groupId, description, interruptOnCancel)
+}
+
+#' Clear current job group ID and its description
+#'
+#' @param sc existing spark context
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' clearJobGroup(sc)
+#'}
+
+clearJobGroup <- function(sc) {
+ callJMethod(sc, "clearJobGroup")
+}
+
+#' Cancel active jobs for the specified group
+#'
+#' @param sc existing spark context
+#' @param groupId the ID of job group to be cancelled
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' cancelJobGroup(sc, "myJobGroup")
+#'}
+
+cancelJobGroup <- function(sc, groupId) {
+ callJMethod(sc, "cancelJobGroup", groupId)
+}
diff --git a/R/pkg/inst/tests/test_context.R b/R/pkg/inst/tests/test_context.R
index e4aab37436..513bbc8e62 100644
--- a/R/pkg/inst/tests/test_context.R
+++ b/R/pkg/inst/tests/test_context.R
@@ -48,3 +48,10 @@ test_that("rdd GC across sparkR.stop", {
count(rdd3)
count(rdd4)
})
+
+test_that("job group functions can be called", {
+ sc <- sparkR.init()
+ setJobGroup(sc, "groupId", "job description", TRUE)
+ cancelJobGroup(sc, "groupId")
+ clearJobGroup(sc)
+})