aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFelix Cheung <felixcheung_m@hotmail.com>2016-06-23 09:45:01 -0700
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2016-06-23 09:45:01 -0700
commitb5a997667f4c0e514217da6df5af37b8b849dfdf (patch)
tree7f9e4528fd4774965b4766129170c1d539f99423
parent65d1f0f716f50dd14b5dfe1e7fac772f1b4d2be0 (diff)
downloadspark-b5a997667f4c0e514217da6df5af37b8b849dfdf.tar.gz
spark-b5a997667f4c0e514217da6df5af37b8b849dfdf.tar.bz2
spark-b5a997667f4c0e514217da6df5af37b8b849dfdf.zip
[SPARK-16088][SPARKR] update setJobGroup, cancelJobGroup, clearJobGroup
## What changes were proposed in this pull request? Updated setJobGroup, cancelJobGroup, clearJobGroup to not require sc/SparkContext as parameter. Also updated roxygen2 doc and R programming guide on deprecations. ## How was this patch tested? unit tests Author: Felix Cheung <felixcheung_m@hotmail.com> Closes #13838 from felixcheung/rjobgroup.
-rw-r--r--R/pkg/R/DataFrame.R1
-rw-r--r--R/pkg/R/context.R10
-rw-r--r--R/pkg/R/sparkR.R68
-rw-r--r--R/pkg/R/utils.R8
-rw-r--r--R/pkg/inst/tests/testthat/test_context.R10
-rw-r--r--docs/sparkr.md2
6 files changed, 75 insertions, 24 deletions
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 725cbf24f2..f856979c2a 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -55,7 +55,6 @@ setMethod("initialize", "SparkDataFrame", function(.Object, sdf, isCached) {
.Object
})
-#' @rdname SparkDataFrame
#' @export
#' @param sdf A Java object reference to the backing Scala DataFrame
#' @param isCached TRUE if the SparkDataFrame is cached
diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R
index dd0ceaeb08..2538bb2507 100644
--- a/R/pkg/R/context.R
+++ b/R/pkg/R/context.R
@@ -264,10 +264,7 @@ setCheckpointDir <- function(sc, dirName) {
#'}
#' @note spark.lapply since 2.0.0
spark.lapply <- function(list, func) {
- if (!exists(".sparkRjsc", envir = .sparkREnv)) {
- stop("SparkR has not been initialized. Please call sparkR.session()")
- }
- sc <- get(".sparkRjsc", envir = .sparkREnv)
+ sc <- getSparkContext()
rdd <- parallelize(sc, list, length(list))
results <- map(rdd, func)
local <- collect(results)
@@ -287,9 +284,6 @@ spark.lapply <- function(list, func) {
#'}
#' @note setLogLevel since 2.0.0
setLogLevel <- function(level) {
- if (!exists(".sparkRjsc", envir = .sparkREnv)) {
- stop("SparkR has not been initialized. Please call sparkR.session()")
- }
- sc <- get(".sparkRjsc", envir = .sparkREnv)
+ sc <- getSparkContext()
callJMethod(sc, "setLogLevel", level)
}
diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R
index 2b6e124151..62659b0c0c 100644
--- a/R/pkg/R/sparkR.R
+++ b/R/pkg/R/sparkR.R
@@ -392,47 +392,91 @@ sparkR.session <- function(
#' Assigns a group ID to all the jobs started by this thread until the group ID is set to a
#' different value or cleared.
#'
-#' @param sc existing spark context
#' @param groupid the ID to be assigned to job groups
#' @param description description for the job group ID
#' @param interruptOnCancel flag to indicate if the job is interrupted on job cancellation
+#' @rdname setJobGroup
+#' @name setJobGroup
#' @examples
#'\dontrun{
-#' sc <- sparkR.init()
-#' setJobGroup(sc, "myJobGroup", "My job group description", TRUE)
+#' sparkR.session()
+#' setJobGroup("myJobGroup", "My job group description", TRUE)
#'}
#' @note setJobGroup since 1.5.0
-setJobGroup <- function(sc, groupId, description, interruptOnCancel) {
+#' @method setJobGroup default
+setJobGroup.default <- function(groupId, description, interruptOnCancel) {
+ sc <- getSparkContext()
callJMethod(sc, "setJobGroup", groupId, description, interruptOnCancel)
}
+setJobGroup <- function(sc, groupId, description, interruptOnCancel) {
+ if (class(sc) == "jobj" && any(grepl("JavaSparkContext", getClassName.jobj(sc)))) {
+ .Deprecated("setJobGroup(groupId, description, interruptOnCancel)",
+ old = "setJobGroup(sc, groupId, description, interruptOnCancel)")
+ setJobGroup.default(groupId, description, interruptOnCancel)
+ } else {
+ # Parameter order is shifted
+ groupIdToUse <- sc
+ descriptionToUse <- groupId
+ interruptOnCancelToUse <- description
+ setJobGroup.default(groupIdToUse, descriptionToUse, interruptOnCancelToUse)
+ }
+}
+
#' Clear current job group ID and its description
#'
-#' @param sc existing spark context
+#' @rdname clearJobGroup
+#' @name clearJobGroup
#' @examples
#'\dontrun{
-#' sc <- sparkR.init()
-#' clearJobGroup(sc)
+#' sparkR.session()
+#' clearJobGroup()
#'}
#' @note clearJobGroup since 1.5.0
-clearJobGroup <- function(sc) {
+#' @method clearJobGroup default
+clearJobGroup.default <- function() {
+ sc <- getSparkContext()
callJMethod(sc, "clearJobGroup")
}
+clearJobGroup <- function(sc) {
+ if (!missing(sc) &&
+ class(sc) == "jobj" &&
+ any(grepl("JavaSparkContext", getClassName.jobj(sc)))) {
+ .Deprecated("clearJobGroup()", old = "clearJobGroup(sc)")
+ }
+ clearJobGroup.default()
+}
+
+
#' Cancel active jobs for the specified group
#'
-#' @param sc existing spark context
#' @param groupId the ID of job group to be cancelled
+#' @rdname cancelJobGroup
+#' @name cancelJobGroup
#' @examples
#'\dontrun{
-#' sc <- sparkR.init()
-#' cancelJobGroup(sc, "myJobGroup")
+#' sparkR.session()
+#' cancelJobGroup("myJobGroup")
#'}
#' @note cancelJobGroup since 1.5.0
-cancelJobGroup <- function(sc, groupId) {
+#' @method cancelJobGroup default
+cancelJobGroup.default <- function(groupId) {
+ sc <- getSparkContext()
callJMethod(sc, "cancelJobGroup", groupId)
}
+cancelJobGroup <- function(sc, groupId) {
+ if (class(sc) == "jobj" && any(grepl("JavaSparkContext", getClassName.jobj(sc)))) {
+ .Deprecated("cancelJobGroup(groupId)", old = "cancelJobGroup(sc, groupId)")
+ cancelJobGroup.default(groupId)
+ } else {
+ # Parameter order is shifted
+ groupIdToUse <- sc
+ cancelJobGroup.default(groupIdToUse)
+ }
+}
+
sparkConfToSubmitOps <- new.env()
sparkConfToSubmitOps[["spark.driver.memory"]] <- "--driver-memory"
sparkConfToSubmitOps[["spark.driver.extraClassPath"]] <- "--driver-class-path"
diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R
index d5c062d3bc..e75bfbf037 100644
--- a/R/pkg/R/utils.R
+++ b/R/pkg/R/utils.R
@@ -685,3 +685,11 @@ launchScript <- function(script, combinedArgs, capture = FALSE) {
system2(script, combinedArgs, wait = capture, stdout = capture)
}
}
+
+getSparkContext <- function() {
+ if (!exists(".sparkRjsc", envir = .sparkREnv)) {
+ stop("SparkR has not been initialized. Please call sparkR.session()")
+ }
+ sc <- get(".sparkRjsc", envir = .sparkREnv)
+ sc
+}
diff --git a/R/pkg/inst/tests/testthat/test_context.R b/R/pkg/inst/tests/testthat/test_context.R
index 3d232df566..2a1bd61b11 100644
--- a/R/pkg/inst/tests/testthat/test_context.R
+++ b/R/pkg/inst/tests/testthat/test_context.R
@@ -100,9 +100,13 @@ test_that("rdd GC across sparkR.stop", {
test_that("job group functions can be called", {
sc <- sparkR.sparkContext()
- setJobGroup(sc, "groupId", "job description", TRUE)
- cancelJobGroup(sc, "groupId")
- clearJobGroup(sc)
+ setJobGroup("groupId", "job description", TRUE)
+ cancelJobGroup("groupId")
+ clearJobGroup()
+
+ suppressWarnings(setJobGroup(sc, "groupId", "job description", TRUE))
+ suppressWarnings(cancelJobGroup(sc, "groupId"))
+ suppressWarnings(clearJobGroup(sc))
sparkR.session.stop()
})
diff --git a/docs/sparkr.md b/docs/sparkr.md
index 9e74e4a96a..32ef815eb1 100644
--- a/docs/sparkr.md
+++ b/docs/sparkr.md
@@ -428,3 +428,5 @@ You can inspect the search path in R with [`search()`](https://stat.ethz.ch/R-ma
- The `sqlContext` parameter is no longer required for these functions: `createDataFrame`, `as.DataFrame`, `read.json`, `jsonFile`, `read.parquet`, `parquetFile`, `read.text`, `sql`, `tables`, `tableNames`, `cacheTable`, `uncacheTable`, `clearCache`, `dropTempTable`, `read.df`, `loadDF`, `createExternalTable`.
- The method `registerTempTable` has been deprecated to be replaced by `createOrReplaceTempView`.
- The method `dropTempTable` has been deprecated to be replaced by `dropTempView`.
+ - The `sc` SparkContext parameter is no longer required for these functions: `setJobGroup`, `clearJobGroup`, `cancelJobGroup`
+