aboutsummaryrefslogtreecommitdiff
path: root/R/pkg
diff options
context:
space:
mode:
authorfelixcheung <felixcheung_m@hotmail.com>2015-12-03 13:25:20 -0800
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2015-12-03 13:25:20 -0800
commit2213441e5e0fba01e05826257604aa427cdf2598 (patch)
treef9628f8232f8f65c15e0774b94cfdc4ca0c3a127 /R/pkg
parenta02d47277379e1e82d0ee41b2205434f9ffbc3e5 (diff)
downloadspark-2213441e5e0fba01e05826257604aa427cdf2598.tar.gz
spark-2213441e5e0fba01e05826257604aa427cdf2598.tar.bz2
spark-2213441e5e0fba01e05826257604aa427cdf2598.zip
[SPARK-12019][SPARKR] Support character vector for sparkR.init(), check param and fix doc
and add tests. Spark submit expects comma-separated list Author: felixcheung <felixcheung_m@hotmail.com> Closes #10034 from felixcheung/sparkrinitdoc.
Diffstat (limited to 'R/pkg')
-rw-r--r--R/pkg/R/client.R10
-rw-r--r--R/pkg/R/sparkR.R56
-rw-r--r--R/pkg/R/utils.R5
-rw-r--r--R/pkg/inst/tests/test_client.R9
-rw-r--r--R/pkg/inst/tests/test_context.R20
5 files changed, 79 insertions, 21 deletions
diff --git a/R/pkg/R/client.R b/R/pkg/R/client.R
index c811d1dac3..25e99390a9 100644
--- a/R/pkg/R/client.R
+++ b/R/pkg/R/client.R
@@ -44,12 +44,16 @@ determineSparkSubmitBin <- function() {
}
generateSparkSubmitArgs <- function(args, sparkHome, jars, sparkSubmitOpts, packages) {
+ jars <- paste0(jars, collapse = ",")
if (jars != "") {
- jars <- paste("--jars", jars)
+ # construct the jars argument with a space between --jars and comma-separated values
+ jars <- paste0("--jars ", jars)
}
- if (!identical(packages, "")) {
- packages <- paste("--packages", packages)
+ packages <- paste0(packages, collapse = ",")
+ if (packages != "") {
+ # construct the packages argument with a space between --packages and comma-separated values
+ packages <- paste0("--packages ", packages)
}
combinedArgs <- paste(jars, packages, sparkSubmitOpts, args, sep = " ")
diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R
index 7ff3fa628b..d2bfad5531 100644
--- a/R/pkg/R/sparkR.R
+++ b/R/pkg/R/sparkR.R
@@ -86,13 +86,13 @@ sparkR.stop <- function() {
#' and use SparkR, refer to SparkR programming guide at
#' \url{http://spark.apache.org/docs/latest/sparkr.html#starting-up-sparkcontext-sqlcontext}.
#'
-#' @param master The Spark master URL.
+#' @param master The Spark master URL
#' @param appName Application name to register with cluster manager
#' @param sparkHome Spark Home directory
-#' @param sparkEnvir Named list of environment variables to set on worker nodes.
-#' @param sparkExecutorEnv Named list of environment variables to be used when launching executors.
-#' @param sparkJars Character string vector of jar files to pass to the worker nodes.
-#' @param sparkPackages Character string vector of packages from spark-packages.org
+#' @param sparkEnvir Named list of environment variables to set on worker nodes
+#' @param sparkExecutorEnv Named list of environment variables to be used when launching executors
+#' @param sparkJars Character vector of jar files to pass to the worker nodes
+#' @param sparkPackages Character vector of packages from spark-packages.org
#' @export
#' @examples
#'\dontrun{
@@ -102,7 +102,9 @@ sparkR.stop <- function() {
#' sc <- sparkR.init("yarn-client", "SparkR", "/home/spark",
#' list(spark.executor.memory="4g"),
#' list(LD_LIBRARY_PATH="/directory of JVM libraries (libjvm.so) on workers/"),
-#' c("jarfile1.jar","jarfile2.jar"))
+#' c("one.jar", "two.jar", "three.jar"),
+#' c("com.databricks:spark-avro_2.10:2.0.1",
+#' "com.databricks:spark-csv_2.10:1.3.0"))
#'}
sparkR.init <- function(
@@ -120,15 +122,8 @@ sparkR.init <- function(
return(get(".sparkRjsc", envir = .sparkREnv))
}
- jars <- suppressWarnings(normalizePath(as.character(sparkJars)))
-
- # Classpath separator is ";" on Windows
- # URI needs four /// as from http://stackoverflow.com/a/18522792
- if (.Platform$OS.type == "unix") {
- uriSep <- "//"
- } else {
- uriSep <- "////"
- }
+ jars <- processSparkJars(sparkJars)
+ packages <- processSparkPackages(sparkPackages)
sparkEnvirMap <- convertNamedListToEnv(sparkEnvir)
@@ -145,7 +140,7 @@ sparkR.init <- function(
sparkHome = sparkHome,
jars = jars,
sparkSubmitOpts = submitOps,
- packages = sparkPackages)
+ packages = packages)
# wait atmost 100 seconds for JVM to launch
wait <- 0.1
for (i in 1:25) {
@@ -195,8 +190,14 @@ sparkR.init <- function(
paste0("$LD_LIBRARY_PATH:",Sys.getenv("LD_LIBRARY_PATH"))
}
- nonEmptyJars <- Filter(function(x) { x != "" }, jars)
- localJarPaths <- lapply(nonEmptyJars,
+ # Classpath separator is ";" on Windows
+ # URI needs four /// as from http://stackoverflow.com/a/18522792
+ if (.Platform$OS.type == "unix") {
+ uriSep <- "//"
+ } else {
+ uriSep <- "////"
+ }
+ localJarPaths <- lapply(jars,
function(j) { utils::URLencode(paste("file:", uriSep, j, sep = "")) })
# Set the start time to identify jobjs
@@ -366,3 +367,22 @@ getClientModeSparkSubmitOpts <- function(submitOps, sparkEnvirMap) {
# --option must be before the application class "sparkr-shell" in submitOps
paste0(paste0(envirToOps, collapse = ""), submitOps)
}
+
+# Utility function that handles sparkJars argument, and normalize paths
+processSparkJars <- function(jars) {
+ splittedJars <- splitString(jars)
+ if (length(splittedJars) > length(jars)) {
+ warning("sparkJars as a comma-separated string is deprecated, use character vector instead")
+ }
+ normalized <- suppressWarnings(normalizePath(splittedJars))
+ normalized
+}
+
+# Utility function that handles sparkPackages argument
+processSparkPackages <- function(packages) {
+ splittedPackages <- splitString(packages)
+ if (length(splittedPackages) > length(packages)) {
+ warning("sparkPackages as a comma-separated string is deprecated, use character vector instead")
+ }
+ splittedPackages
+}
diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R
index 45c77a86c9..43105aaa38 100644
--- a/R/pkg/R/utils.R
+++ b/R/pkg/R/utils.R
@@ -636,3 +636,8 @@ assignNewEnv <- function(data) {
}
env
}
+
+# Utility function to split by ',' and whitespace, remove empty tokens
+splitString <- function(input) {
+ Filter(nzchar, unlist(strsplit(input, ",|\\s")))
+}
diff --git a/R/pkg/inst/tests/test_client.R b/R/pkg/inst/tests/test_client.R
index 8a20991f89..a0664f32f3 100644
--- a/R/pkg/inst/tests/test_client.R
+++ b/R/pkg/inst/tests/test_client.R
@@ -34,3 +34,12 @@ test_that("no package specified doesn't add packages flag", {
test_that("multiple packages don't produce a warning", {
expect_that(generateSparkSubmitArgs("", "", "", "", c("A", "B")), not(gives_warning()))
})
+
+test_that("sparkJars sparkPackages as character vectors", {
+ args <- generateSparkSubmitArgs("", "", c("one.jar", "two.jar", "three.jar"), "",
+ c("com.databricks:spark-avro_2.10:2.0.1",
+ "com.databricks:spark-csv_2.10:1.3.0"))
+ expect_match(args, "--jars one.jar,two.jar,three.jar")
+ expect_match(args,
+ "--packages com.databricks:spark-avro_2.10:2.0.1,com.databricks:spark-csv_2.10:1.3.0")
+})
diff --git a/R/pkg/inst/tests/test_context.R b/R/pkg/inst/tests/test_context.R
index 80c1b89a4c..1707e314be 100644
--- a/R/pkg/inst/tests/test_context.R
+++ b/R/pkg/inst/tests/test_context.R
@@ -92,3 +92,23 @@ test_that("getClientModeSparkSubmitOpts() returns spark-submit args from whiteli
" --driver-memory 4g sparkr-shell2"))
# nolint end
})
+
+test_that("sparkJars sparkPackages as comma-separated strings", {
+ expect_warning(processSparkJars(" a, b "))
+ jars <- suppressWarnings(processSparkJars(" a, b "))
+ expect_equal(jars, c("a", "b"))
+
+ jars <- suppressWarnings(processSparkJars(" abc ,, def "))
+ expect_equal(jars, c("abc", "def"))
+
+ jars <- suppressWarnings(processSparkJars(c(" abc ,, def ", "", "xyz", " ", "a,b")))
+ expect_equal(jars, c("abc", "def", "xyz", "a", "b"))
+
+ p <- processSparkPackages(c("ghi", "lmn"))
+ expect_equal(p, c("ghi", "lmn"))
+
+ # check normalizePath
+ f <- dir()[[1]]
+ expect_that(processSparkJars(f), not(gives_warning()))
+ expect_match(processSparkJars(f), f)
+})