From 93c743f1aca433144611b11d4e1b169d66e0f57b Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Mon, 26 Sep 2016 16:47:57 -0700 Subject: [SPARK-17577][FOLLOW-UP][SPARKR] SparkR spark.addFile supports adding directory recursively ## What changes were proposed in this pull request? #15140 exposed ```JavaSparkContext.addFile(path: String, recursive: Boolean)``` to Python/R, then we can update SparkR ```spark.addFile``` to support adding directory recursively. ## How was this patch tested? Added unit test. Author: Yanbo Liang Closes #15216 from yanboliang/spark-17577-2. --- R/pkg/R/context.R | 9 +++++++-- R/pkg/inst/tests/testthat/test_context.R | 22 ++++++++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) (limited to 'R/pkg') diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index 4793578ad6..fe2f3e3d10 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -231,17 +231,22 @@ setCheckpointDir <- function(sc, dirName) { #' filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs, #' use spark.getSparkFiles(fileName) to find its download location. #' +#' A directory can be given if the recursive option is set to true. +#' Currently directories are only supported for Hadoop-supported filesystems. +#' Refer Hadoop-supported filesystems at \url{https://wiki.apache.org/hadoop/HCFS}. +#' #' @rdname spark.addFile #' @param path The path of the file to be added +#' @param recursive Whether to add files recursively from the path. Default is FALSE. #' @export #' @examples #'\dontrun{ #' spark.addFile("~/myfile") #'} #' @note spark.addFile since 2.1.0 -spark.addFile <- function(path) { +spark.addFile <- function(path, recursive = FALSE) { sc <- getSparkContext() - invisible(callJMethod(sc, "addFile", suppressWarnings(normalizePath(path)))) + invisible(callJMethod(sc, "addFile", suppressWarnings(normalizePath(path)), recursive)) } #' Get the root directory that contains files added through spark.addFile. diff --git a/R/pkg/inst/tests/testthat/test_context.R b/R/pkg/inst/tests/testthat/test_context.R index 0495418bb7..caca069339 100644 --- a/R/pkg/inst/tests/testthat/test_context.R +++ b/R/pkg/inst/tests/testthat/test_context.R @@ -169,6 +169,7 @@ test_that("spark.lapply should perform simple transforms", { test_that("add and get file to be downloaded with Spark job on every node", { sparkR.sparkContext() + # Test add file. path <- tempfile(pattern = "hello", fileext = ".txt") filename <- basename(path) words <- "Hello World!" @@ -177,5 +178,26 @@ test_that("add and get file to be downloaded with Spark job on every node", { download_path <- spark.getSparkFiles(filename) expect_equal(readLines(download_path), words) unlink(path) + + # Test add directory recursively. + path <- paste0(tempdir(), "/", "recursive_dir") + dir.create(path) + dir_name <- basename(path) + path1 <- paste0(path, "/", "hello.txt") + file.create(path1) + sub_path <- paste0(path, "/", "sub_hello") + dir.create(sub_path) + path2 <- paste0(sub_path, "/", "sub_hello.txt") + file.create(path2) + words <- "Hello World!" + sub_words <- "Sub Hello World!" + writeLines(words, path1) + writeLines(sub_words, path2) + spark.addFile(path, recursive = TRUE) + download_path1 <- spark.getSparkFiles(paste0(dir_name, "/", "hello.txt")) + expect_equal(readLines(download_path1), words) + download_path2 <- spark.getSparkFiles(paste0(dir_name, "/", "sub_hello/sub_hello.txt")) + expect_equal(readLines(download_path2), sub_words) + unlink(path, recursive = TRUE) sparkR.session.stop() }) -- cgit v1.2.3