aboutsummaryrefslogtreecommitdiff
path: root/R
diff options
context:
space:
mode:
authorYanbo Liang <ybliang8@gmail.com>2016-09-21 20:08:28 -0700
committerYanbo Liang <ybliang8@gmail.com>2016-09-21 20:08:28 -0700
commitc133907c5d9a6e6411b896b5e0cff48b2beff09f (patch)
treef19d91c861860737b06b0fae0118ce43094cbebe /R
parent7cbe2164499e83b6c009fdbab0fbfffe89a2ecc0 (diff)
downloadspark-c133907c5d9a6e6411b896b5e0cff48b2beff09f.tar.gz
spark-c133907c5d9a6e6411b896b5e0cff48b2beff09f.tar.bz2
spark-c133907c5d9a6e6411b896b5e0cff48b2beff09f.zip
[SPARK-17577][SPARKR][CORE] SparkR support add files to Spark job and get by executors
## What changes were proposed in this pull request? Scala/Python users can add files to Spark job by submit options ```--files``` or ```SparkContext.addFile()```. Meanwhile, users can get the added file by ```SparkFiles.get(filename)```. We should also support this function for SparkR users, since they also have the requirements for some shared dependency files. For example, SparkR users can download third party R packages to driver firstly, add these files to the Spark job as dependency by this API and then each executor can install these packages by ```install.packages```. ## How was this patch tested? Add unit test. Author: Yanbo Liang <ybliang8@gmail.com> Closes #15131 from yanboliang/spark-17577.
Diffstat (limited to 'R')
-rw-r--r--R/pkg/NAMESPACE3
-rw-r--r--R/pkg/R/context.R48
-rw-r--r--R/pkg/inst/tests/testthat/test_context.R13
3 files changed, 64 insertions, 0 deletions
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index a5e9cbdc37..267a38c215 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -336,6 +336,9 @@ export("as.DataFrame",
"read.parquet",
"read.text",
"spark.lapply",
+ "spark.addFile",
+ "spark.getSparkFilesRootDirectory",
+ "spark.getSparkFiles",
"sql",
"str",
"tableToDF",
diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R
index 13ade49eab..4793578ad6 100644
--- a/R/pkg/R/context.R
+++ b/R/pkg/R/context.R
@@ -225,6 +225,54 @@ setCheckpointDir <- function(sc, dirName) {
invisible(callJMethod(sc, "setCheckpointDir", suppressWarnings(normalizePath(dirName))))
}
+#' Add a file or directory to be downloaded with this Spark job on every node.
+#'
+#' The path passed can be either a local file, a file in HDFS (or other Hadoop-supported
+#' filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
+#' use spark.getSparkFiles(fileName) to find its download location.
+#'
+#' @rdname spark.addFile
+#' @param path The path of the file to be added
+#' @export
+#' @examples
+#'\dontrun{
+#' spark.addFile("~/myfile")
+#'}
+#' @note spark.addFile since 2.1.0
+spark.addFile <- function(path) {
+ sc <- getSparkContext()
+ invisible(callJMethod(sc, "addFile", suppressWarnings(normalizePath(path))))
+}
+
+#' Get the root directory that contains files added through spark.addFile.
+#'
+#' @rdname spark.getSparkFilesRootDirectory
+#' @return the root directory that contains files added through spark.addFile
+#' @export
+#' @examples
+#'\dontrun{
+#' spark.getSparkFilesRootDirectory()
+#'}
+#' @note spark.getSparkFilesRootDirectory since 2.1.0
+spark.getSparkFilesRootDirectory <- function() {
+ callJStatic("org.apache.spark.SparkFiles", "getRootDirectory")
+}
+
+#' Get the absolute path of a file added through spark.addFile.
+#'
+#' @rdname spark.getSparkFiles
+#' @param fileName The name of the file added through spark.addFile
+#' @return the absolute path of a file added through spark.addFile.
+#' @export
+#' @examples
+#'\dontrun{
+#' spark.getSparkFiles("myfile")
+#'}
+#' @note spark.getSparkFiles since 2.1.0
+spark.getSparkFiles <- function(fileName) {
+ callJStatic("org.apache.spark.SparkFiles", "get", as.character(fileName))
+}
+
#' Run a function over a list of elements, distributing the computations with Spark
#'
#' Run a function over a list of elements, distributing the computations with Spark. Applies a
diff --git a/R/pkg/inst/tests/testthat/test_context.R b/R/pkg/inst/tests/testthat/test_context.R
index 1ab7f319df..0495418bb7 100644
--- a/R/pkg/inst/tests/testthat/test_context.R
+++ b/R/pkg/inst/tests/testthat/test_context.R
@@ -166,3 +166,16 @@ test_that("spark.lapply should perform simple transforms", {
expect_equal(doubled, as.list(2 * 1:10))
sparkR.session.stop()
})
+
+test_that("add and get file to be downloaded with Spark job on every node", {
+ sparkR.sparkContext()
+ path <- tempfile(pattern = "hello", fileext = ".txt")
+ filename <- basename(path)
+ words <- "Hello World!"
+ writeLines(words, path)
+ spark.addFile(path)
+ download_path <- spark.getSparkFiles(filename)
+ expect_equal(readLines(download_path), words)
+ unlink(path)
+ sparkR.session.stop()
+})