diff options
author | Felix Cheung <felixcheung_m@hotmail.com> | 2016-06-23 09:45:01 -0700 |
---|---|---|
committer | Shivaram Venkataraman <shivaram@cs.berkeley.edu> | 2016-06-23 09:45:01 -0700 |
commit | b5a997667f4c0e514217da6df5af37b8b849dfdf (patch) | |
tree | 7f9e4528fd4774965b4766129170c1d539f99423 | |
parent | 65d1f0f716f50dd14b5dfe1e7fac772f1b4d2be0 (diff) | |
download | spark-b5a997667f4c0e514217da6df5af37b8b849dfdf.tar.gz spark-b5a997667f4c0e514217da6df5af37b8b849dfdf.tar.bz2 spark-b5a997667f4c0e514217da6df5af37b8b849dfdf.zip |
[SPARK-16088][SPARKR] update setJobGroup, cancelJobGroup, clearJobGroup
## What changes were proposed in this pull request?
Updated setJobGroup, cancelJobGroup, clearJobGroup to not require sc/SparkContext as parameter.
Also updated roxygen2 doc and R programming guide on deprecations.
## How was this patch tested?
unit tests
Author: Felix Cheung <felixcheung_m@hotmail.com>
Closes #13838 from felixcheung/rjobgroup.
-rw-r--r-- | R/pkg/R/DataFrame.R | 1 | ||||
-rw-r--r-- | R/pkg/R/context.R | 10 | ||||
-rw-r--r-- | R/pkg/R/sparkR.R | 68 | ||||
-rw-r--r-- | R/pkg/R/utils.R | 8 | ||||
-rw-r--r-- | R/pkg/inst/tests/testthat/test_context.R | 10 | ||||
-rw-r--r-- | docs/sparkr.md | 2 |
6 files changed, 75 insertions, 24 deletions
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 725cbf24f2..f856979c2a 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -55,7 +55,6 @@ setMethod("initialize", "SparkDataFrame", function(.Object, sdf, isCached) { .Object }) -#' @rdname SparkDataFrame #' @export #' @param sdf A Java object reference to the backing Scala DataFrame #' @param isCached TRUE if the SparkDataFrame is cached diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index dd0ceaeb08..2538bb2507 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -264,10 +264,7 @@ setCheckpointDir <- function(sc, dirName) { #'} #' @note spark.lapply since 2.0.0 spark.lapply <- function(list, func) { - if (!exists(".sparkRjsc", envir = .sparkREnv)) { - stop("SparkR has not been initialized. Please call sparkR.session()") - } - sc <- get(".sparkRjsc", envir = .sparkREnv) + sc <- getSparkContext() rdd <- parallelize(sc, list, length(list)) results <- map(rdd, func) local <- collect(results) @@ -287,9 +284,6 @@ spark.lapply <- function(list, func) { #'} #' @note setLogLevel since 2.0.0 setLogLevel <- function(level) { - if (!exists(".sparkRjsc", envir = .sparkREnv)) { - stop("SparkR has not been initialized. Please call sparkR.session()") - } - sc <- get(".sparkRjsc", envir = .sparkREnv) + sc <- getSparkContext() callJMethod(sc, "setLogLevel", level) } diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R index 2b6e124151..62659b0c0c 100644 --- a/R/pkg/R/sparkR.R +++ b/R/pkg/R/sparkR.R @@ -392,47 +392,91 @@ sparkR.session <- function( #' Assigns a group ID to all the jobs started by this thread until the group ID is set to a #' different value or cleared. #' -#' @param sc existing spark context #' @param groupid the ID to be assigned to job groups #' @param description description for the job group ID #' @param interruptOnCancel flag to indicate if the job is interrupted on job cancellation +#' @rdname setJobGroup +#' @name setJobGroup #' @examples #'\dontrun{ -#' sc <- sparkR.init() -#' setJobGroup(sc, "myJobGroup", "My job group description", TRUE) +#' sparkR.session() +#' setJobGroup("myJobGroup", "My job group description", TRUE) #'} #' @note setJobGroup since 1.5.0 -setJobGroup <- function(sc, groupId, description, interruptOnCancel) { +#' @method setJobGroup default +setJobGroup.default <- function(groupId, description, interruptOnCancel) { + sc <- getSparkContext() callJMethod(sc, "setJobGroup", groupId, description, interruptOnCancel) } +setJobGroup <- function(sc, groupId, description, interruptOnCancel) { + if (class(sc) == "jobj" && any(grepl("JavaSparkContext", getClassName.jobj(sc)))) { + .Deprecated("setJobGroup(groupId, description, interruptOnCancel)", + old = "setJobGroup(sc, groupId, description, interruptOnCancel)") + setJobGroup.default(groupId, description, interruptOnCancel) + } else { + # Parameter order is shifted + groupIdToUse <- sc + descriptionToUse <- groupId + interruptOnCancelToUse <- description + setJobGroup.default(groupIdToUse, descriptionToUse, interruptOnCancelToUse) + } +} + #' Clear current job group ID and its description #' -#' @param sc existing spark context +#' @rdname clearJobGroup +#' @name clearJobGroup #' @examples #'\dontrun{ -#' sc <- sparkR.init() -#' clearJobGroup(sc) +#' sparkR.session() +#' clearJobGroup() #'} #' @note clearJobGroup since 1.5.0 -clearJobGroup <- function(sc) { +#' @method clearJobGroup default +clearJobGroup.default <- function() { + sc <- getSparkContext() callJMethod(sc, "clearJobGroup") } +clearJobGroup <- function(sc) { + if (!missing(sc) && + class(sc) == "jobj" && + any(grepl("JavaSparkContext", getClassName.jobj(sc)))) { + .Deprecated("clearJobGroup()", old = "clearJobGroup(sc)") + } + clearJobGroup.default() +} + + #' Cancel active jobs for the specified group #' -#' @param sc existing spark context #' @param groupId the ID of job group to be cancelled +#' @rdname cancelJobGroup +#' @name cancelJobGroup #' @examples #'\dontrun{ -#' sc <- sparkR.init() -#' cancelJobGroup(sc, "myJobGroup") +#' sparkR.session() +#' cancelJobGroup("myJobGroup") #'} #' @note cancelJobGroup since 1.5.0 -cancelJobGroup <- function(sc, groupId) { +#' @method cancelJobGroup default +cancelJobGroup.default <- function(groupId) { + sc <- getSparkContext() callJMethod(sc, "cancelJobGroup", groupId) } +cancelJobGroup <- function(sc, groupId) { + if (class(sc) == "jobj" && any(grepl("JavaSparkContext", getClassName.jobj(sc)))) { + .Deprecated("cancelJobGroup(groupId)", old = "cancelJobGroup(sc, groupId)") + cancelJobGroup.default(groupId) + } else { + # Parameter order is shifted + groupIdToUse <- sc + cancelJobGroup.default(groupIdToUse) + } +} + sparkConfToSubmitOps <- new.env() sparkConfToSubmitOps[["spark.driver.memory"]] <- "--driver-memory" sparkConfToSubmitOps[["spark.driver.extraClassPath"]] <- "--driver-class-path" diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R index d5c062d3bc..e75bfbf037 100644 --- a/R/pkg/R/utils.R +++ b/R/pkg/R/utils.R @@ -685,3 +685,11 @@ launchScript <- function(script, combinedArgs, capture = FALSE) { system2(script, combinedArgs, wait = capture, stdout = capture) } } + +getSparkContext <- function() { + if (!exists(".sparkRjsc", envir = .sparkREnv)) { + stop("SparkR has not been initialized. Please call sparkR.session()") + } + sc <- get(".sparkRjsc", envir = .sparkREnv) + sc +} diff --git a/R/pkg/inst/tests/testthat/test_context.R b/R/pkg/inst/tests/testthat/test_context.R index 3d232df566..2a1bd61b11 100644 --- a/R/pkg/inst/tests/testthat/test_context.R +++ b/R/pkg/inst/tests/testthat/test_context.R @@ -100,9 +100,13 @@ test_that("rdd GC across sparkR.stop", { test_that("job group functions can be called", { sc <- sparkR.sparkContext() - setJobGroup(sc, "groupId", "job description", TRUE) - cancelJobGroup(sc, "groupId") - clearJobGroup(sc) + setJobGroup("groupId", "job description", TRUE) + cancelJobGroup("groupId") + clearJobGroup() + + suppressWarnings(setJobGroup(sc, "groupId", "job description", TRUE)) + suppressWarnings(cancelJobGroup(sc, "groupId")) + suppressWarnings(clearJobGroup(sc)) sparkR.session.stop() }) diff --git a/docs/sparkr.md b/docs/sparkr.md index 9e74e4a96a..32ef815eb1 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -428,3 +428,5 @@ You can inspect the search path in R with [`search()`](https://stat.ethz.ch/R-ma - The `sqlContext` parameter is no longer required for these functions: `createDataFrame`, `as.DataFrame`, `read.json`, `jsonFile`, `read.parquet`, `parquetFile`, `read.text`, `sql`, `tables`, `tableNames`, `cacheTable`, `uncacheTable`, `clearCache`, `dropTempTable`, `read.df`, `loadDF`, `createExternalTable`. - The method `registerTempTable` has been deprecated to be replaced by `createOrReplaceTempView`. - The method `dropTempTable` has been deprecated to be replaced by `dropTempView`. + - The `sc` SparkContext parameter is no longer required for these functions: `setJobGroup`, `clearJobGroup`, `cancelJobGroup` + |