aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--R/pkg/NAMESPACE5
-rw-r--r--R/pkg/R/sparkR.R44
-rw-r--r--R/pkg/inst/tests/test_context.R7
3 files changed, 56 insertions, 0 deletions
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index f9447f6c32..7f85722245 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -10,6 +10,11 @@ export("sparkR.init")
export("sparkR.stop")
export("print.jobj")
+# Job group lifecycle management methods
+export("setJobGroup",
+ "clearJobGroup",
+ "cancelJobGroup")
+
exportClasses("DataFrame")
exportMethods("arrange",
diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R
index 5ced7c688f..2efd4f0742 100644
--- a/R/pkg/R/sparkR.R
+++ b/R/pkg/R/sparkR.R
@@ -278,3 +278,47 @@ sparkRHive.init <- function(jsc = NULL) {
assign(".sparkRHivesc", hiveCtx, envir = .sparkREnv)
hiveCtx
}
+
+#' Assigns a group ID to all the jobs started by this thread until the group ID is set to a
+#' different value or cleared.
+#'
+#' @param sc existing spark context
+#' @param groupid the ID to be assigned to job groups
+#' @param description description for the the job group ID
+#' @param interruptOnCancel flag to indicate if the job is interrupted on job cancellation
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' setJobGroup(sc, "myJobGroup", "My job group description", TRUE)
+#'}
+
+setJobGroup <- function(sc, groupId, description, interruptOnCancel) {
+ callJMethod(sc, "setJobGroup", groupId, description, interruptOnCancel)
+}
+
+#' Clear current job group ID and its description
+#'
+#' @param sc existing spark context
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' clearJobGroup(sc)
+#'}
+
+clearJobGroup <- function(sc) {
+ callJMethod(sc, "clearJobGroup")
+}
+
+#' Cancel active jobs for the specified group
+#'
+#' @param sc existing spark context
+#' @param groupId the ID of job group to be cancelled
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' cancelJobGroup(sc, "myJobGroup")
+#'}
+
+cancelJobGroup <- function(sc, groupId) {
+ callJMethod(sc, "cancelJobGroup", groupId)
+}
diff --git a/R/pkg/inst/tests/test_context.R b/R/pkg/inst/tests/test_context.R
index e4aab37436..513bbc8e62 100644
--- a/R/pkg/inst/tests/test_context.R
+++ b/R/pkg/inst/tests/test_context.R
@@ -48,3 +48,10 @@ test_that("rdd GC across sparkR.stop", {
count(rdd3)
count(rdd4)
})
+
+test_that("job group functions can be called", {
+ sc <- sparkR.init()
+ setJobGroup(sc, "groupId", "job description", TRUE)
+ cancelJobGroup(sc, "groupId")
+ clearJobGroup(sc)
+})