aboutsummaryrefslogtreecommitdiff
path: root/R
diff options
context:
space:
mode:
authorHolden Karau <holden@pigscanfly.ca>2015-06-24 11:55:20 -0700
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2015-06-24 11:55:20 -0700
commit43e66192f45a23f7232116e9f664158862df5015 (patch)
tree2fd464e8c2c997397a3a05abb0bf0f7d62031c33 /R
parent1173483f3f465a4c63246e83d0aaa2af521395f5 (diff)
downloadspark-43e66192f45a23f7232116e9f664158862df5015.tar.gz
spark-43e66192f45a23f7232116e9f664158862df5015.tar.bz2
spark-43e66192f45a23f7232116e9f664158862df5015.zip
[SPARK-8506] Add pakages to R context created through init.
Author: Holden Karau <holden@pigscanfly.ca> Closes #6928 from holdenk/SPARK-8506-sparkr-does-not-provide-an-easy-way-to-depend-on-spark-packages-when-performing-init-from-inside-of-r and squashes the following commits: b60dd63 [Holden Karau] Add an example with the spark-csv package fa8bc92 [Holden Karau] typo: sparm -> spark 865a90c [Holden Karau] strip spaces for comparision c7a4471 [Holden Karau] Add some documentation c1a9233 [Holden Karau] refactor for testing c818556 [Holden Karau] Add pakages to R
Diffstat (limited to 'R')
-rw-r--r--R/pkg/R/client.R26
-rw-r--r--R/pkg/R/sparkR.R7
-rw-r--r--R/pkg/inst/tests/test_client.R32
3 files changed, 56 insertions, 9 deletions
diff --git a/R/pkg/R/client.R b/R/pkg/R/client.R
index 1281c41213..cf2e5ddeb7 100644
--- a/R/pkg/R/client.R
+++ b/R/pkg/R/client.R
@@ -34,24 +34,36 @@ connectBackend <- function(hostname, port, timeout = 6000) {
con
}
-launchBackend <- function(args, sparkHome, jars, sparkSubmitOpts) {
+determineSparkSubmitBin <- function() {
if (.Platform$OS.type == "unix") {
sparkSubmitBinName = "spark-submit"
} else {
sparkSubmitBinName = "spark-submit.cmd"
}
+ sparkSubmitBinName
+}
+
+generateSparkSubmitArgs <- function(args, sparkHome, jars, sparkSubmitOpts, packages) {
+ if (jars != "") {
+ jars <- paste("--jars", jars)
+ }
+
+ if (packages != "") {
+ packages <- paste("--packages", packages)
+ }
+ combinedArgs <- paste(jars, packages, sparkSubmitOpts, args, sep = " ")
+ combinedArgs
+}
+
+launchBackend <- function(args, sparkHome, jars, sparkSubmitOpts, packages) {
+ sparkSubmitBin <- determineSparkSubmitBin()
if (sparkHome != "") {
sparkSubmitBin <- file.path(sparkHome, "bin", sparkSubmitBinName)
} else {
sparkSubmitBin <- sparkSubmitBinName
}
-
- if (jars != "") {
- jars <- paste("--jars", jars)
- }
-
- combinedArgs <- paste(jars, sparkSubmitOpts, args, sep = " ")
+ combinedArgs <- generateSparkSubmitArgs(args, sparkHome, jars, sparkSubmitOpts, packages)
cat("Launching java with spark-submit command", sparkSubmitBin, combinedArgs, "\n")
invisible(system2(sparkSubmitBin, combinedArgs, wait = F))
}
diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R
index dbde0c44c5..8f81d5640c 100644
--- a/R/pkg/R/sparkR.R
+++ b/R/pkg/R/sparkR.R
@@ -81,6 +81,7 @@ sparkR.stop <- function() {
#' @param sparkExecutorEnv Named list of environment variables to be used when launching executors.
#' @param sparkJars Character string vector of jar files to pass to the worker nodes.
#' @param sparkRLibDir The path where R is installed on the worker nodes.
+#' @param sparkPackages Character string vector of packages from spark-packages.org
#' @export
#' @examples
#'\dontrun{
@@ -100,7 +101,8 @@ sparkR.init <- function(
sparkEnvir = list(),
sparkExecutorEnv = list(),
sparkJars = "",
- sparkRLibDir = "") {
+ sparkRLibDir = "",
+ sparkPackages = "") {
if (exists(".sparkRjsc", envir = .sparkREnv)) {
cat("Re-using existing Spark Context. Please stop SparkR with sparkR.stop() or restart R to create a new Spark Context\n")
@@ -129,7 +131,8 @@ sparkR.init <- function(
args = path,
sparkHome = sparkHome,
jars = jars,
- sparkSubmitOpts = Sys.getenv("SPARKR_SUBMIT_ARGS", "sparkr-shell"))
+ sparkSubmitOpts = Sys.getenv("SPARKR_SUBMIT_ARGS", "sparkr-shell"),
+ sparkPackages = sparkPackages)
# wait atmost 100 seconds for JVM to launch
wait <- 0.1
for (i in 1:25) {
diff --git a/R/pkg/inst/tests/test_client.R b/R/pkg/inst/tests/test_client.R
new file mode 100644
index 0000000000..30b05c1a2a
--- /dev/null
+++ b/R/pkg/inst/tests/test_client.R
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("functions in client.R")
+
+test_that("adding spark-testing-base as a package works", {
+ args <- generateSparkSubmitArgs("", "", "", "",
+ "holdenk:spark-testing-base:1.3.0_0.0.5")
+ expect_equal(gsub("[[:space:]]", "", args),
+ gsub("[[:space:]]", "",
+ "--packages holdenk:spark-testing-base:1.3.0_0.0.5"))
+})
+
+test_that("no package specified doesn't add packages flag", {
+ args <- generateSparkSubmitArgs("", "", "", "", "")
+ expect_equal(gsub("[[:space:]]", "", args),
+ "")
+})