From b5a997667f4c0e514217da6df5af37b8b849dfdf Mon Sep 17 00:00:00 2001 From: Felix Cheung Date: Thu, 23 Jun 2016 09:45:01 -0700 Subject: [SPARK-16088][SPARKR] update setJobGroup, cancelJobGroup, clearJobGroup ## What changes were proposed in this pull request? Updated setJobGroup, cancelJobGroup, clearJobGroup to not require sc/SparkContext as parameter. Also updated roxygen2 doc and R programming guide on deprecations. ## How was this patch tested? unit tests Author: Felix Cheung Closes #13838 from felixcheung/rjobgroup. --- R/pkg/R/DataFrame.R | 1 - R/pkg/R/context.R | 10 +---- R/pkg/R/sparkR.R | 68 ++++++++++++++++++++++++++------ R/pkg/R/utils.R | 8 ++++ R/pkg/inst/tests/testthat/test_context.R | 10 +++-- 5 files changed, 73 insertions(+), 24 deletions(-) (limited to 'R/pkg') diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 725cbf24f2..f856979c2a 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -55,7 +55,6 @@ setMethod("initialize", "SparkDataFrame", function(.Object, sdf, isCached) { .Object }) -#' @rdname SparkDataFrame #' @export #' @param sdf A Java object reference to the backing Scala DataFrame #' @param isCached TRUE if the SparkDataFrame is cached diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index dd0ceaeb08..2538bb2507 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -264,10 +264,7 @@ setCheckpointDir <- function(sc, dirName) { #'} #' @note spark.lapply since 2.0.0 spark.lapply <- function(list, func) { - if (!exists(".sparkRjsc", envir = .sparkREnv)) { - stop("SparkR has not been initialized. Please call sparkR.session()") - } - sc <- get(".sparkRjsc", envir = .sparkREnv) + sc <- getSparkContext() rdd <- parallelize(sc, list, length(list)) results <- map(rdd, func) local <- collect(results) @@ -287,9 +284,6 @@ spark.lapply <- function(list, func) { #'} #' @note setLogLevel since 2.0.0 setLogLevel <- function(level) { - if (!exists(".sparkRjsc", envir = .sparkREnv)) { - stop("SparkR has not been initialized. Please call sparkR.session()") - } - sc <- get(".sparkRjsc", envir = .sparkREnv) + sc <- getSparkContext() callJMethod(sc, "setLogLevel", level) } diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R index 2b6e124151..62659b0c0c 100644 --- a/R/pkg/R/sparkR.R +++ b/R/pkg/R/sparkR.R @@ -392,47 +392,91 @@ sparkR.session <- function( #' Assigns a group ID to all the jobs started by this thread until the group ID is set to a #' different value or cleared. #' -#' @param sc existing spark context #' @param groupid the ID to be assigned to job groups #' @param description description for the job group ID #' @param interruptOnCancel flag to indicate if the job is interrupted on job cancellation +#' @rdname setJobGroup +#' @name setJobGroup #' @examples #'\dontrun{ -#' sc <- sparkR.init() -#' setJobGroup(sc, "myJobGroup", "My job group description", TRUE) +#' sparkR.session() +#' setJobGroup("myJobGroup", "My job group description", TRUE) #'} #' @note setJobGroup since 1.5.0 -setJobGroup <- function(sc, groupId, description, interruptOnCancel) { +#' @method setJobGroup default +setJobGroup.default <- function(groupId, description, interruptOnCancel) { + sc <- getSparkContext() callJMethod(sc, "setJobGroup", groupId, description, interruptOnCancel) } +setJobGroup <- function(sc, groupId, description, interruptOnCancel) { + if (class(sc) == "jobj" && any(grepl("JavaSparkContext", getClassName.jobj(sc)))) { + .Deprecated("setJobGroup(groupId, description, interruptOnCancel)", + old = "setJobGroup(sc, groupId, description, interruptOnCancel)") + setJobGroup.default(groupId, description, interruptOnCancel) + } else { + # Parameter order is shifted + groupIdToUse <- sc + descriptionToUse <- groupId + interruptOnCancelToUse <- description + setJobGroup.default(groupIdToUse, descriptionToUse, interruptOnCancelToUse) + } +} + #' Clear current job group ID and its description #' -#' @param sc existing spark context +#' @rdname clearJobGroup +#' @name clearJobGroup #' @examples #'\dontrun{ -#' sc <- sparkR.init() -#' clearJobGroup(sc) +#' sparkR.session() +#' clearJobGroup() #'} #' @note clearJobGroup since 1.5.0 -clearJobGroup <- function(sc) { +#' @method clearJobGroup default +clearJobGroup.default <- function() { + sc <- getSparkContext() callJMethod(sc, "clearJobGroup") } +clearJobGroup <- function(sc) { + if (!missing(sc) && + class(sc) == "jobj" && + any(grepl("JavaSparkContext", getClassName.jobj(sc)))) { + .Deprecated("clearJobGroup()", old = "clearJobGroup(sc)") + } + clearJobGroup.default() +} + + #' Cancel active jobs for the specified group #' -#' @param sc existing spark context #' @param groupId the ID of job group to be cancelled +#' @rdname cancelJobGroup +#' @name cancelJobGroup #' @examples #'\dontrun{ -#' sc <- sparkR.init() -#' cancelJobGroup(sc, "myJobGroup") +#' sparkR.session() +#' cancelJobGroup("myJobGroup") #'} #' @note cancelJobGroup since 1.5.0 -cancelJobGroup <- function(sc, groupId) { +#' @method cancelJobGroup default +cancelJobGroup.default <- function(groupId) { + sc <- getSparkContext() callJMethod(sc, "cancelJobGroup", groupId) } +cancelJobGroup <- function(sc, groupId) { + if (class(sc) == "jobj" && any(grepl("JavaSparkContext", getClassName.jobj(sc)))) { + .Deprecated("cancelJobGroup(groupId)", old = "cancelJobGroup(sc, groupId)") + cancelJobGroup.default(groupId) + } else { + # Parameter order is shifted + groupIdToUse <- sc + cancelJobGroup.default(groupIdToUse) + } +} + sparkConfToSubmitOps <- new.env() sparkConfToSubmitOps[["spark.driver.memory"]] <- "--driver-memory" sparkConfToSubmitOps[["spark.driver.extraClassPath"]] <- "--driver-class-path" diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R index d5c062d3bc..e75bfbf037 100644 --- a/R/pkg/R/utils.R +++ b/R/pkg/R/utils.R @@ -685,3 +685,11 @@ launchScript <- function(script, combinedArgs, capture = FALSE) { system2(script, combinedArgs, wait = capture, stdout = capture) } } + +getSparkContext <- function() { + if (!exists(".sparkRjsc", envir = .sparkREnv)) { + stop("SparkR has not been initialized. Please call sparkR.session()") + } + sc <- get(".sparkRjsc", envir = .sparkREnv) + sc +} diff --git a/R/pkg/inst/tests/testthat/test_context.R b/R/pkg/inst/tests/testthat/test_context.R index 3d232df566..2a1bd61b11 100644 --- a/R/pkg/inst/tests/testthat/test_context.R +++ b/R/pkg/inst/tests/testthat/test_context.R @@ -100,9 +100,13 @@ test_that("rdd GC across sparkR.stop", { test_that("job group functions can be called", { sc <- sparkR.sparkContext() - setJobGroup(sc, "groupId", "job description", TRUE) - cancelJobGroup(sc, "groupId") - clearJobGroup(sc) + setJobGroup("groupId", "job description", TRUE) + cancelJobGroup("groupId") + clearJobGroup() + + suppressWarnings(setJobGroup(sc, "groupId", "job description", TRUE)) + suppressWarnings(cancelJobGroup(sc, "groupId")) + suppressWarnings(clearJobGroup(sc)) sparkR.session.stop() }) -- cgit v1.2.3