From c133907c5d9a6e6411b896b5e0cff48b2beff09f Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Wed, 21 Sep 2016 20:08:28 -0700 Subject: [SPARK-17577][SPARKR][CORE] SparkR support add files to Spark job and get by executors ## What changes were proposed in this pull request? Scala/Python users can add files to Spark job by submit options ```--files``` or ```SparkContext.addFile()```. Meanwhile, users can get the added file by ```SparkFiles.get(filename)```. We should also support this function for SparkR users, since they also have the requirements for some shared dependency files. For example, SparkR users can download third party R packages to driver firstly, add these files to the Spark job as dependency by this API and then each executor can install these packages by ```install.packages```. ## How was this patch tested? Add unit test. Author: Yanbo Liang Closes #15131 from yanboliang/spark-17577. --- R/pkg/NAMESPACE | 3 ++ R/pkg/R/context.R | 48 ++++++++++++++++++++++++++++++++ R/pkg/inst/tests/testthat/test_context.R | 13 +++++++++ 3 files changed, 64 insertions(+) (limited to 'R') diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index a5e9cbdc37..267a38c215 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -336,6 +336,9 @@ export("as.DataFrame", "read.parquet", "read.text", "spark.lapply", + "spark.addFile", + "spark.getSparkFilesRootDirectory", + "spark.getSparkFiles", "sql", "str", "tableToDF", diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index 13ade49eab..4793578ad6 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -225,6 +225,54 @@ setCheckpointDir <- function(sc, dirName) { invisible(callJMethod(sc, "setCheckpointDir", suppressWarnings(normalizePath(dirName)))) } +#' Add a file or directory to be downloaded with this Spark job on every node. +#' +#' The path passed can be either a local file, a file in HDFS (or other Hadoop-supported +#' filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs, +#' use spark.getSparkFiles(fileName) to find its download location. +#' +#' @rdname spark.addFile +#' @param path The path of the file to be added +#' @export +#' @examples +#'\dontrun{ +#' spark.addFile("~/myfile") +#'} +#' @note spark.addFile since 2.1.0 +spark.addFile <- function(path) { + sc <- getSparkContext() + invisible(callJMethod(sc, "addFile", suppressWarnings(normalizePath(path)))) +} + +#' Get the root directory that contains files added through spark.addFile. +#' +#' @rdname spark.getSparkFilesRootDirectory +#' @return the root directory that contains files added through spark.addFile +#' @export +#' @examples +#'\dontrun{ +#' spark.getSparkFilesRootDirectory() +#'} +#' @note spark.getSparkFilesRootDirectory since 2.1.0 +spark.getSparkFilesRootDirectory <- function() { + callJStatic("org.apache.spark.SparkFiles", "getRootDirectory") +} + +#' Get the absolute path of a file added through spark.addFile. +#' +#' @rdname spark.getSparkFiles +#' @param fileName The name of the file added through spark.addFile +#' @return the absolute path of a file added through spark.addFile. +#' @export +#' @examples +#'\dontrun{ +#' spark.getSparkFiles("myfile") +#'} +#' @note spark.getSparkFiles since 2.1.0 +spark.getSparkFiles <- function(fileName) { + callJStatic("org.apache.spark.SparkFiles", "get", as.character(fileName)) +} + #' Run a function over a list of elements, distributing the computations with Spark #' #' Run a function over a list of elements, distributing the computations with Spark. Applies a diff --git a/R/pkg/inst/tests/testthat/test_context.R b/R/pkg/inst/tests/testthat/test_context.R index 1ab7f319df..0495418bb7 100644 --- a/R/pkg/inst/tests/testthat/test_context.R +++ b/R/pkg/inst/tests/testthat/test_context.R @@ -166,3 +166,16 @@ test_that("spark.lapply should perform simple transforms", { expect_equal(doubled, as.list(2 * 1:10)) sparkR.session.stop() }) + +test_that("add and get file to be downloaded with Spark job on every node", { + sparkR.sparkContext() + path <- tempfile(pattern = "hello", fileext = ".txt") + filename <- basename(path) + words <- "Hello World!" + writeLines(words, path) + spark.addFile(path) + download_path <- spark.getSparkFiles(filename) + expect_equal(readLines(download_path), words) + unlink(path) + sparkR.session.stop() +}) -- cgit v1.2.3