aboutsummaryrefslogtreecommitdiff
path: root/R
diff options
context:
space:
mode:
authorYanbo Liang <ybliang8@gmail.com>2017-03-21 21:50:54 -0700
committerYanbo Liang <ybliang8@gmail.com>2017-03-21 21:50:54 -0700
commit478fbc866fbfdb4439788583281863ecea14e8af (patch)
tree5e2e46c53ec06403b06e086a983395a0bf4b8c32 /R
parentc1e87e384d1878308b42da80bb3d65be512aab55 (diff)
downloadspark-478fbc866fbfdb4439788583281863ecea14e8af.tar.gz
spark-478fbc866fbfdb4439788583281863ecea14e8af.tar.bz2
spark-478fbc866fbfdb4439788583281863ecea14e8af.zip
[SPARK-19925][SPARKR] Fix SparkR spark.getSparkFiles fails when it was called on executors.
## What changes were proposed in this pull request? SparkR ```spark.getSparkFiles``` fails when it was called on executors, see details at [SPARK-19925](https://issues.apache.org/jira/browse/SPARK-19925). ## How was this patch tested? Add unit tests, and verify this fix at standalone and yarn cluster. Author: Yanbo Liang <ybliang8@gmail.com> Closes #17274 from yanboliang/spark-19925.
Diffstat (limited to 'R')
-rw-r--r--R/pkg/R/context.R16
-rw-r--r--R/pkg/inst/tests/testthat/test_context.R7
2 files changed, 21 insertions, 2 deletions
diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R
index 1ca573e5bd..50856e3d98 100644
--- a/R/pkg/R/context.R
+++ b/R/pkg/R/context.R
@@ -330,7 +330,13 @@ spark.addFile <- function(path, recursive = FALSE) {
#'}
#' @note spark.getSparkFilesRootDirectory since 2.1.0
spark.getSparkFilesRootDirectory <- function() {
- callJStatic("org.apache.spark.SparkFiles", "getRootDirectory")
+ if (Sys.getenv("SPARKR_IS_RUNNING_ON_WORKER") == "") {
+ # Running on driver.
+ callJStatic("org.apache.spark.SparkFiles", "getRootDirectory")
+ } else {
+ # Running on worker.
+ Sys.getenv("SPARKR_SPARKFILES_ROOT_DIR")
+ }
}
#' Get the absolute path of a file added through spark.addFile.
@@ -345,7 +351,13 @@ spark.getSparkFilesRootDirectory <- function() {
#'}
#' @note spark.getSparkFiles since 2.1.0
spark.getSparkFiles <- function(fileName) {
- callJStatic("org.apache.spark.SparkFiles", "get", as.character(fileName))
+ if (Sys.getenv("SPARKR_IS_RUNNING_ON_WORKER") == "") {
+ # Running on driver.
+ callJStatic("org.apache.spark.SparkFiles", "get", as.character(fileName))
+ } else {
+ # Running on worker.
+ file.path(spark.getSparkFilesRootDirectory(), as.character(fileName))
+ }
}
#' Run a function over a list of elements, distributing the computations with Spark
diff --git a/R/pkg/inst/tests/testthat/test_context.R b/R/pkg/inst/tests/testthat/test_context.R
index caca069339..c847113491 100644
--- a/R/pkg/inst/tests/testthat/test_context.R
+++ b/R/pkg/inst/tests/testthat/test_context.R
@@ -177,6 +177,13 @@ test_that("add and get file to be downloaded with Spark job on every node", {
spark.addFile(path)
download_path <- spark.getSparkFiles(filename)
expect_equal(readLines(download_path), words)
+
+ # Test spark.getSparkFiles works well on executors.
+ seq <- seq(from = 1, to = 10, length.out = 5)
+ f <- function(seq) { spark.getSparkFiles(filename) }
+ results <- spark.lapply(seq, f)
+ for (i in 1:5) { expect_equal(basename(results[[i]]), filename) }
+
unlink(path)
# Test add directory recursively.