aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYanbo Liang <ybliang8@gmail.com>2016-09-21 20:08:28 -0700
committerYanbo Liang <ybliang8@gmail.com>2016-09-21 20:08:28 -0700
commitc133907c5d9a6e6411b896b5e0cff48b2beff09f (patch)
treef19d91c861860737b06b0fae0118ce43094cbebe
parent7cbe2164499e83b6c009fdbab0fbfffe89a2ecc0 (diff)
downloadspark-c133907c5d9a6e6411b896b5e0cff48b2beff09f.tar.gz
spark-c133907c5d9a6e6411b896b5e0cff48b2beff09f.tar.bz2
spark-c133907c5d9a6e6411b896b5e0cff48b2beff09f.zip
[SPARK-17577][SPARKR][CORE] SparkR support add files to Spark job and get by executors
## What changes were proposed in this pull request? Scala/Python users can add files to Spark job by submit options ```--files``` or ```SparkContext.addFile()```. Meanwhile, users can get the added file by ```SparkFiles.get(filename)```. We should also support this function for SparkR users, since they also have the requirements for some shared dependency files. For example, SparkR users can download third party R packages to driver firstly, add these files to the Spark job as dependency by this API and then each executor can install these packages by ```install.packages```. ## How was this patch tested? Add unit test. Author: Yanbo Liang <ybliang8@gmail.com> Closes #15131 from yanboliang/spark-17577.
-rw-r--r--R/pkg/NAMESPACE3
-rw-r--r--R/pkg/R/context.R48
-rw-r--r--R/pkg/inst/tests/testthat/test_context.R13
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala6
4 files changed, 67 insertions, 3 deletions
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index a5e9cbdc37..267a38c215 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -336,6 +336,9 @@ export("as.DataFrame",
"read.parquet",
"read.text",
"spark.lapply",
+ "spark.addFile",
+ "spark.getSparkFilesRootDirectory",
+ "spark.getSparkFiles",
"sql",
"str",
"tableToDF",
diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R
index 13ade49eab..4793578ad6 100644
--- a/R/pkg/R/context.R
+++ b/R/pkg/R/context.R
@@ -225,6 +225,54 @@ setCheckpointDir <- function(sc, dirName) {
invisible(callJMethod(sc, "setCheckpointDir", suppressWarnings(normalizePath(dirName))))
}
+#' Add a file or directory to be downloaded with this Spark job on every node.
+#'
+#' The path passed can be either a local file, a file in HDFS (or other Hadoop-supported
+#' filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
+#' use spark.getSparkFiles(fileName) to find its download location.
+#'
+#' @rdname spark.addFile
+#' @param path The path of the file to be added
+#' @export
+#' @examples
+#'\dontrun{
+#' spark.addFile("~/myfile")
+#'}
+#' @note spark.addFile since 2.1.0
+spark.addFile <- function(path) {
+ sc <- getSparkContext()
+ invisible(callJMethod(sc, "addFile", suppressWarnings(normalizePath(path))))
+}
+
+#' Get the root directory that contains files added through spark.addFile.
+#'
+#' @rdname spark.getSparkFilesRootDirectory
+#' @return the root directory that contains files added through spark.addFile
+#' @export
+#' @examples
+#'\dontrun{
+#' spark.getSparkFilesRootDirectory()
+#'}
+#' @note spark.getSparkFilesRootDirectory since 2.1.0
+spark.getSparkFilesRootDirectory <- function() {
+ callJStatic("org.apache.spark.SparkFiles", "getRootDirectory")
+}
+
+#' Get the absolute path of a file added through spark.addFile.
+#'
+#' @rdname spark.getSparkFiles
+#' @param fileName The name of the file added through spark.addFile
+#' @return the absolute path of a file added through spark.addFile.
+#' @export
+#' @examples
+#'\dontrun{
+#' spark.getSparkFiles("myfile")
+#'}
+#' @note spark.getSparkFiles since 2.1.0
+spark.getSparkFiles <- function(fileName) {
+ callJStatic("org.apache.spark.SparkFiles", "get", as.character(fileName))
+}
+
#' Run a function over a list of elements, distributing the computations with Spark
#'
#' Run a function over a list of elements, distributing the computations with Spark. Applies a
diff --git a/R/pkg/inst/tests/testthat/test_context.R b/R/pkg/inst/tests/testthat/test_context.R
index 1ab7f319df..0495418bb7 100644
--- a/R/pkg/inst/tests/testthat/test_context.R
+++ b/R/pkg/inst/tests/testthat/test_context.R
@@ -166,3 +166,16 @@ test_that("spark.lapply should perform simple transforms", {
expect_equal(doubled, as.list(2 * 1:10))
sparkR.session.stop()
})
+
+test_that("add and get file to be downloaded with Spark job on every node", {
+ sparkR.sparkContext()
+ path <- tempfile(pattern = "hello", fileext = ".txt")
+ filename <- basename(path)
+ words <- "Hello World!"
+ writeLines(words, path)
+ spark.addFile(path)
+ download_path <- spark.getSparkFiles(filename)
+ expect_equal(readLines(download_path), words)
+ unlink(path)
+ sparkR.session.stop()
+})
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index db84172e16..1981ad5671 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1427,7 +1427,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* supported for Hadoop-supported filesystems.
*/
def addFile(path: String, recursive: Boolean): Unit = {
- val uri = new URI(path)
+ val uri = new Path(path).toUri
val schemeCorrectedPath = uri.getScheme match {
case null | "local" => new File(path).getCanonicalFile.toURI.toString
case _ => path
@@ -1458,8 +1458,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
logInfo(s"Added file $path at $key with timestamp $timestamp")
// Fetch the file locally so that closures which are run on the driver can still use the
// SparkFiles API to access files.
- Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager,
- hadoopConfiguration, timestamp, useCache = false)
+ Utils.fetchFile(uri.toString, new File(SparkFiles.getRootDirectory()), conf,
+ env.securityManager, hadoopConfiguration, timestamp, useCache = false)
postEnvironmentUpdate()
}
}