aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-10-07 11:34:49 -0700
committerFelix Cheung <felixcheung@apache.org>2016-10-07 11:34:49 -0700
commit9d8ae853ecc5600f5c2f69565b96d5c46a8c0048 (patch)
treec3f57c9401be83b54eaa96f3fa4018fa527da3d5
parentbb1aaf28eca6d9ae9af664ac3ad35cafdfc01a3b (diff)
downloadspark-9d8ae853ecc5600f5c2f69565b96d5c46a8c0048.tar.gz
spark-9d8ae853ecc5600f5c2f69565b96d5c46a8c0048.tar.bz2
spark-9d8ae853ecc5600f5c2f69565b96d5c46a8c0048.zip
[SPARK-17665][SPARKR] Support options/mode all for read/write APIs and options in other types
## What changes were proposed in this pull request? This PR includes the changes below: - Support `mode`/`options` in `read.parquet`, `write.parquet`, `read.orc`, `write.orc`, `read.text`, `write.text`, `read.json` and `write.json` APIs - Support other types (logical, numeric and string) as options for `write.df`, `read.df`, `read.parquet`, `write.parquet`, `read.orc`, `write.orc`, `read.text`, `write.text`, `read.json` and `write.json` ## How was this patch tested? Unit tests in `test_sparkSQL.R`/ `utils.R`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #15239 from HyukjinKwon/SPARK-17665.
-rw-r--r--R/pkg/R/DataFrame.R43
-rw-r--r--R/pkg/R/SQLContext.R23
-rw-r--r--R/pkg/R/generics.R10
-rw-r--r--R/pkg/R/utils.R22
-rw-r--r--R/pkg/inst/tests/testthat/test_sparkSQL.R75
-rw-r--r--R/pkg/inst/tests/testthat/test_utils.R9
6 files changed, 160 insertions, 22 deletions
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 75861d5de7..801d2ed4e7 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -55,6 +55,19 @@ setMethod("initialize", "SparkDataFrame", function(.Object, sdf, isCached) {
.Object
})
+#' Set options/mode and then return the write object
+#' @noRd
+setWriteOptions <- function(write, path = NULL, mode = "error", ...) {
+ options <- varargsToStrEnv(...)
+ if (!is.null(path)) {
+ options[["path"]] <- path
+ }
+ jmode <- convertToJSaveMode(mode)
+ write <- callJMethod(write, "mode", jmode)
+ write <- callJMethod(write, "options", options)
+ write
+}
+
#' @export
#' @param sdf A Java object reference to the backing Scala DataFrame
#' @param isCached TRUE if the SparkDataFrame is cached
@@ -727,6 +740,8 @@ setMethod("toJSON",
#'
#' @param x A SparkDataFrame
#' @param path The directory where the file is saved
+#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
+#' @param ... additional argument(s) passed to the method.
#'
#' @family SparkDataFrame functions
#' @rdname write.json
@@ -743,8 +758,9 @@ setMethod("toJSON",
#' @note write.json since 1.6.0
setMethod("write.json",
signature(x = "SparkDataFrame", path = "character"),
- function(x, path) {
+ function(x, path, mode = "error", ...) {
write <- callJMethod(x@sdf, "write")
+ write <- setWriteOptions(write, mode = mode, ...)
invisible(callJMethod(write, "json", path))
})
@@ -755,6 +771,8 @@ setMethod("write.json",
#'
#' @param x A SparkDataFrame
#' @param path The directory where the file is saved
+#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
+#' @param ... additional argument(s) passed to the method.
#'
#' @family SparkDataFrame functions
#' @aliases write.orc,SparkDataFrame,character-method
@@ -771,8 +789,9 @@ setMethod("write.json",
#' @note write.orc since 2.0.0
setMethod("write.orc",
signature(x = "SparkDataFrame", path = "character"),
- function(x, path) {
+ function(x, path, mode = "error", ...) {
write <- callJMethod(x@sdf, "write")
+ write <- setWriteOptions(write, mode = mode, ...)
invisible(callJMethod(write, "orc", path))
})
@@ -783,6 +802,8 @@ setMethod("write.orc",
#'
#' @param x A SparkDataFrame
#' @param path The directory where the file is saved
+#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
+#' @param ... additional argument(s) passed to the method.
#'
#' @family SparkDataFrame functions
#' @rdname write.parquet
@@ -800,8 +821,9 @@ setMethod("write.orc",
#' @note write.parquet since 1.6.0
setMethod("write.parquet",
signature(x = "SparkDataFrame", path = "character"),
- function(x, path) {
+ function(x, path, mode = "error", ...) {
write <- callJMethod(x@sdf, "write")
+ write <- setWriteOptions(write, mode = mode, ...)
invisible(callJMethod(write, "parquet", path))
})
@@ -825,6 +847,8 @@ setMethod("saveAsParquetFile",
#'
#' @param x A SparkDataFrame
#' @param path The directory where the file is saved
+#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
+#' @param ... additional argument(s) passed to the method.
#'
#' @family SparkDataFrame functions
#' @aliases write.text,SparkDataFrame,character-method
@@ -841,8 +865,9 @@ setMethod("saveAsParquetFile",
#' @note write.text since 2.0.0
setMethod("write.text",
signature(x = "SparkDataFrame", path = "character"),
- function(x, path) {
+ function(x, path, mode = "error", ...) {
write <- callJMethod(x@sdf, "write")
+ write <- setWriteOptions(write, mode = mode, ...)
invisible(callJMethod(write, "text", path))
})
@@ -2637,15 +2662,9 @@ setMethod("write.df",
if (is.null(source)) {
source <- getDefaultSqlSource()
}
- jmode <- convertToJSaveMode(mode)
- options <- varargsToEnv(...)
- if (!is.null(path)) {
- options[["path"]] <- path
- }
write <- callJMethod(df@sdf, "write")
write <- callJMethod(write, "format", source)
- write <- callJMethod(write, "mode", jmode)
- write <- callJMethod(write, "options", options)
+ write <- setWriteOptions(write, path = path, mode = mode, ...)
write <- handledCallJMethod(write, "save")
})
@@ -2701,7 +2720,7 @@ setMethod("saveAsTable",
source <- getDefaultSqlSource()
}
jmode <- convertToJSaveMode(mode)
- options <- varargsToEnv(...)
+ options <- varargsToStrEnv(...)
write <- callJMethod(df@sdf, "write")
write <- callJMethod(write, "format", source)
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index baa87824be..0d6a229e63 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -328,6 +328,7 @@ setMethod("toDF", signature(x = "RDD"),
#' It goes through the entire dataset once to determine the schema.
#'
#' @param path Path of file to read. A vector of multiple paths is allowed.
+#' @param ... additional external data source specific named properties.
#' @return SparkDataFrame
#' @rdname read.json
#' @export
@@ -341,11 +342,13 @@ setMethod("toDF", signature(x = "RDD"),
#' @name read.json
#' @method read.json default
#' @note read.json since 1.6.0
-read.json.default <- function(path) {
+read.json.default <- function(path, ...) {
sparkSession <- getSparkSession()
+ options <- varargsToStrEnv(...)
# Allow the user to have a more flexible definiton of the text file path
paths <- as.list(suppressWarnings(normalizePath(path)))
read <- callJMethod(sparkSession, "read")
+ read <- callJMethod(read, "options", options)
sdf <- callJMethod(read, "json", paths)
dataFrame(sdf)
}
@@ -405,16 +408,19 @@ jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) {
#' Loads an ORC file, returning the result as a SparkDataFrame.
#'
#' @param path Path of file to read.
+#' @param ... additional external data source specific named properties.
#' @return SparkDataFrame
#' @rdname read.orc
#' @export
#' @name read.orc
#' @note read.orc since 2.0.0
-read.orc <- function(path) {
+read.orc <- function(path, ...) {
sparkSession <- getSparkSession()
+ options <- varargsToStrEnv(...)
# Allow the user to have a more flexible definiton of the ORC file path
path <- suppressWarnings(normalizePath(path))
read <- callJMethod(sparkSession, "read")
+ read <- callJMethod(read, "options", options)
sdf <- callJMethod(read, "orc", path)
dataFrame(sdf)
}
@@ -430,11 +436,13 @@ read.orc <- function(path) {
#' @name read.parquet
#' @method read.parquet default
#' @note read.parquet since 1.6.0
-read.parquet.default <- function(path) {
+read.parquet.default <- function(path, ...) {
sparkSession <- getSparkSession()
+ options <- varargsToStrEnv(...)
# Allow the user to have a more flexible definiton of the Parquet file path
paths <- as.list(suppressWarnings(normalizePath(path)))
read <- callJMethod(sparkSession, "read")
+ read <- callJMethod(read, "options", options)
sdf <- callJMethod(read, "parquet", paths)
dataFrame(sdf)
}
@@ -467,6 +475,7 @@ parquetFile <- function(x, ...) {
#' Each line in the text file is a new row in the resulting SparkDataFrame.
#'
#' @param path Path of file to read. A vector of multiple paths is allowed.
+#' @param ... additional external data source specific named properties.
#' @return SparkDataFrame
#' @rdname read.text
#' @export
@@ -479,11 +488,13 @@ parquetFile <- function(x, ...) {
#' @name read.text
#' @method read.text default
#' @note read.text since 1.6.1
-read.text.default <- function(path) {
+read.text.default <- function(path, ...) {
sparkSession <- getSparkSession()
+ options <- varargsToStrEnv(...)
# Allow the user to have a more flexible definiton of the text file path
paths <- as.list(suppressWarnings(normalizePath(path)))
read <- callJMethod(sparkSession, "read")
+ read <- callJMethod(read, "options", options)
sdf <- callJMethod(read, "text", paths)
dataFrame(sdf)
}
@@ -779,7 +790,7 @@ read.df.default <- function(path = NULL, source = NULL, schema = NULL, na.string
"in 'spark.sql.sources.default' configuration by default.")
}
sparkSession <- getSparkSession()
- options <- varargsToEnv(...)
+ options <- varargsToStrEnv(...)
if (!is.null(path)) {
options[["path"]] <- path
}
@@ -842,7 +853,7 @@ loadDF <- function(x = NULL, ...) {
#' @note createExternalTable since 1.4.0
createExternalTable.default <- function(tableName, path = NULL, source = NULL, ...) {
sparkSession <- getSparkSession()
- options <- varargsToEnv(...)
+ options <- varargsToStrEnv(...)
if (!is.null(path)) {
options[["path"]] <- path
}
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 90a02e2778..810aea9017 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -651,15 +651,17 @@ setGeneric("write.jdbc", function(x, url, tableName, mode = "error", ...) {
#' @rdname write.json
#' @export
-setGeneric("write.json", function(x, path) { standardGeneric("write.json") })
+setGeneric("write.json", function(x, path, ...) { standardGeneric("write.json") })
#' @rdname write.orc
#' @export
-setGeneric("write.orc", function(x, path) { standardGeneric("write.orc") })
+setGeneric("write.orc", function(x, path, ...) { standardGeneric("write.orc") })
#' @rdname write.parquet
#' @export
-setGeneric("write.parquet", function(x, path) { standardGeneric("write.parquet") })
+setGeneric("write.parquet", function(x, path, ...) {
+ standardGeneric("write.parquet")
+})
#' @rdname write.parquet
#' @export
@@ -667,7 +669,7 @@ setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParqu
#' @rdname write.text
#' @export
-setGeneric("write.text", function(x, path) { standardGeneric("write.text") })
+setGeneric("write.text", function(x, path, ...) { standardGeneric("write.text") })
#' @rdname schema
#' @export
diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R
index e696664534..fa8bb0f79c 100644
--- a/R/pkg/R/utils.R
+++ b/R/pkg/R/utils.R
@@ -334,6 +334,28 @@ varargsToEnv <- function(...) {
env
}
+# Utility function to capture the varargs into environment object but all values are converted
+# into string.
+varargsToStrEnv <- function(...) {
+ pairs <- list(...)
+ env <- new.env()
+ for (name in names(pairs)) {
+ value <- pairs[[name]]
+ if (!(is.logical(value) || is.numeric(value) || is.character(value) || is.null(value))) {
+ stop(paste0("Unsupported type for ", name, " : ", class(value),
+ ". Supported types are logical, numeric, character and NULL."))
+ }
+ if (is.logical(value)) {
+ env[[name]] <- tolower(as.character(value))
+ } else if (is.null(value)) {
+ env[[name]] <- value
+ } else {
+ env[[name]] <- as.character(value)
+ }
+ }
+ env
+}
+
getStorageLevel <- function(newLevel = c("DISK_ONLY",
"DISK_ONLY_2",
"MEMORY_AND_DISK",
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index f5ab601f27..6d8cfad5c1 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -256,6 +256,23 @@ test_that("read/write csv as DataFrame", {
unlink(csvPath2)
})
+test_that("Support other types for options", {
+ csvPath <- tempfile(pattern = "sparkr-test", fileext = ".csv")
+ mockLinesCsv <- c("year,make,model,comment,blank",
+ "\"2012\",\"Tesla\",\"S\",\"No comment\",",
+ "1997,Ford,E350,\"Go get one now they are going fast\",",
+ "2015,Chevy,Volt",
+ "NA,Dummy,Placeholder")
+ writeLines(mockLinesCsv, csvPath)
+
+ csvDf <- read.df(csvPath, "csv", header = "true", inferSchema = "true")
+ expected <- read.df(csvPath, "csv", header = TRUE, inferSchema = TRUE)
+ expect_equal(collect(csvDf), collect(expected))
+
+ expect_error(read.df(csvPath, "csv", header = TRUE, maxColumns = 3))
+ unlink(csvPath)
+})
+
test_that("convert NAs to null type in DataFrames", {
rdd <- parallelize(sc, list(list(1L, 2L), list(NA, 4L)))
df <- createDataFrame(rdd, list("a", "b"))
@@ -497,6 +514,19 @@ test_that("read/write json files", {
unlink(jsonPath3)
})
+test_that("read/write json files - compression option", {
+ df <- read.df(jsonPath, "json")
+
+ jsonPath <- tempfile(pattern = "jsonPath", fileext = ".json")
+ write.json(df, jsonPath, compression = "gzip")
+ jsonDF <- read.json(jsonPath)
+ expect_is(jsonDF, "SparkDataFrame")
+ expect_equal(count(jsonDF), count(df))
+ expect_true(length(list.files(jsonPath, pattern = ".gz")) > 0)
+
+ unlink(jsonPath)
+})
+
test_that("jsonRDD() on a RDD with json string", {
sqlContext <- suppressWarnings(sparkRSQL.init(sc))
rdd <- parallelize(sc, mockLines)
@@ -1786,6 +1816,21 @@ test_that("read/write ORC files", {
unsetHiveContext()
})
+test_that("read/write ORC files - compression option", {
+ setHiveContext(sc)
+ df <- read.df(jsonPath, "json")
+
+ orcPath2 <- tempfile(pattern = "orcPath2", fileext = ".orc")
+ write.orc(df, orcPath2, compression = "ZLIB")
+ orcDF <- read.orc(orcPath2)
+ expect_is(orcDF, "SparkDataFrame")
+ expect_equal(count(orcDF), count(df))
+ expect_true(length(list.files(orcPath2, pattern = ".zlib.orc")) > 0)
+
+ unlink(orcPath2)
+ unsetHiveContext()
+})
+
test_that("read/write Parquet files", {
df <- read.df(jsonPath, "json")
# Test write.df and read.df
@@ -1817,6 +1862,23 @@ test_that("read/write Parquet files", {
unlink(parquetPath4)
})
+test_that("read/write Parquet files - compression option/mode", {
+ df <- read.df(jsonPath, "json")
+ tempPath <- tempfile(pattern = "tempPath", fileext = ".parquet")
+
+ # Test write.df and read.df
+ write.parquet(df, tempPath, compression = "GZIP")
+ df2 <- read.parquet(tempPath)
+ expect_is(df2, "SparkDataFrame")
+ expect_equal(count(df2), 3)
+ expect_true(length(list.files(tempPath, pattern = ".gz.parquet")) > 0)
+
+ write.parquet(df, tempPath, mode = "overwrite")
+ df3 <- read.parquet(tempPath)
+ expect_is(df3, "SparkDataFrame")
+ expect_equal(count(df3), 3)
+})
+
test_that("read/write text files", {
# Test write.df and read.df
df <- read.df(jsonPath, "text")
@@ -1838,6 +1900,19 @@ test_that("read/write text files", {
unlink(textPath2)
})
+test_that("read/write text files - compression option", {
+ df <- read.df(jsonPath, "text")
+
+ textPath <- tempfile(pattern = "textPath", fileext = ".txt")
+ write.text(df, textPath, compression = "GZIP")
+ textDF <- read.text(textPath)
+ expect_is(textDF, "SparkDataFrame")
+ expect_equal(count(textDF), count(df))
+ expect_true(length(list.files(textPath, pattern = ".gz")) > 0)
+
+ unlink(textPath)
+})
+
test_that("describe() and summarize() on a DataFrame", {
df <- read.json(jsonPath)
stats <- describe(df, "age")
diff --git a/R/pkg/inst/tests/testthat/test_utils.R b/R/pkg/inst/tests/testthat/test_utils.R
index 69ed554916..a20254e9b3 100644
--- a/R/pkg/inst/tests/testthat/test_utils.R
+++ b/R/pkg/inst/tests/testthat/test_utils.R
@@ -217,4 +217,13 @@ test_that("rbindRaws", {
})
+test_that("varargsToStrEnv", {
+ strenv <- varargsToStrEnv(a = 1, b = 1.1, c = TRUE, d = "abcd")
+ env <- varargsToEnv(a = "1", b = "1.1", c = "true", d = "abcd")
+ expect_equal(strenv, env)
+ expect_error(varargsToStrEnv(a = list(1, "a")),
+ paste0("Unsupported type for a : list. Supported types are logical, ",
+ "numeric, character and NULL."))
+})
+
sparkR.session.stop()