aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYanbo Liang <ybliang8@gmail.com>2015-12-16 10:34:30 -0800
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2015-12-16 10:34:30 -0800
commit22f6cd86fc2e2d6f6ad2c3aae416732c46ebf1b1 (patch)
treecc8e6e2e75599647daa850ae773f151b4753091e
parent2eb5af5f0d3c424dc617bb1a18dd0210ea9ba0bc (diff)
downloadspark-22f6cd86fc2e2d6f6ad2c3aae416732c46ebf1b1.tar.gz
spark-22f6cd86fc2e2d6f6ad2c3aae416732c46ebf1b1.tar.bz2
spark-22f6cd86fc2e2d6f6ad2c3aae416732c46ebf1b1.zip
[SPARK-12310][SPARKR] Add write.json and write.parquet for SparkR
Add ```write.json``` and ```write.parquet``` for SparkR, and deprecated ```saveAsParquetFile```. Author: Yanbo Liang <ybliang8@gmail.com> Closes #10281 from yanboliang/spark-12310.
-rw-r--r--R/pkg/NAMESPACE4
-rw-r--r--R/pkg/R/DataFrame.R51
-rw-r--r--R/pkg/R/generics.R16
-rw-r--r--R/pkg/inst/tests/testthat/test_sparkSQL.R104
4 files changed, 119 insertions, 56 deletions
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index cab39d68c3..ccc01fe169 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -92,7 +92,9 @@ exportMethods("arrange",
"with",
"withColumn",
"withColumnRenamed",
- "write.df")
+ "write.df",
+ "write.json",
+ "write.parquet")
exportClasses("Column")
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 380a13fe2b..0cfa12b997 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -596,17 +596,44 @@ setMethod("toJSON",
RDD(jrdd, serializedMode = "string")
})
-#' saveAsParquetFile
+#' write.json
+#'
+#' Save the contents of a DataFrame as a JSON file (one object per line). Files written out
+#' with this method can be read back in as a DataFrame using read.json().
+#'
+#' @param x A SparkSQL DataFrame
+#' @param path The directory where the file is saved
+#'
+#' @family DataFrame functions
+#' @rdname write.json
+#' @name write.json
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlContext <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- read.json(sqlContext, path)
+#' write.json(df, "/tmp/sparkr-tmp/")
+#'}
+setMethod("write.json",
+ signature(x = "DataFrame", path = "character"),
+ function(x, path) {
+ write <- callJMethod(x@sdf, "write")
+ invisible(callJMethod(write, "json", path))
+ })
+
+#' write.parquet
#'
#' Save the contents of a DataFrame as a Parquet file, preserving the schema. Files written out
-#' with this method can be read back in as a DataFrame using parquetFile().
+#' with this method can be read back in as a DataFrame using read.parquet().
#'
#' @param x A SparkSQL DataFrame
#' @param path The directory where the file is saved
#'
#' @family DataFrame functions
-#' @rdname saveAsParquetFile
-#' @name saveAsParquetFile
+#' @rdname write.parquet
+#' @name write.parquet
#' @export
#' @examples
#'\dontrun{
@@ -614,12 +641,24 @@ setMethod("toJSON",
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- read.json(sqlContext, path)
-#' saveAsParquetFile(df, "/tmp/sparkr-tmp/")
+#' write.parquet(df, "/tmp/sparkr-tmp1/")
+#' saveAsParquetFile(df, "/tmp/sparkr-tmp2/")
#'}
+setMethod("write.parquet",
+ signature(x = "DataFrame", path = "character"),
+ function(x, path) {
+ write <- callJMethod(x@sdf, "write")
+ invisible(callJMethod(write, "parquet", path))
+ })
+
+#' @rdname write.parquet
+#' @name saveAsParquetFile
+#' @export
setMethod("saveAsParquetFile",
signature(x = "DataFrame", path = "character"),
function(x, path) {
- invisible(callJMethod(x@sdf, "saveAsParquetFile", path))
+ .Deprecated("write.parquet")
+ write.parquet(x, path)
})
#' Distinct
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index c383e6e78b..62be2ddc8f 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -519,10 +519,6 @@ setGeneric("sample_frac",
#' @export
setGeneric("sampleBy", function(x, col, fractions, seed) { standardGeneric("sampleBy") })
-#' @rdname saveAsParquetFile
-#' @export
-setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") })
-
#' @rdname saveAsTable
#' @export
setGeneric("saveAsTable", function(df, tableName, source, mode, ...) {
@@ -541,6 +537,18 @@ setGeneric("write.df", function(df, path, ...) { standardGeneric("write.df") })
#' @export
setGeneric("saveDF", function(df, path, ...) { standardGeneric("saveDF") })
+#' @rdname write.json
+#' @export
+setGeneric("write.json", function(x, path) { standardGeneric("write.json") })
+
+#' @rdname write.parquet
+#' @export
+setGeneric("write.parquet", function(x, path) { standardGeneric("write.parquet") })
+
+#' @rdname write.parquet
+#' @export
+setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") })
+
#' @rdname schema
#' @export
setGeneric("schema", function(x) { standardGeneric("schema") })
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 071fd310fd..135c7576e5 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -371,22 +371,49 @@ test_that("Collect DataFrame with complex types", {
expect_equal(bob$height, 176.5)
})
-test_that("read.json()/jsonFile() on a local file returns a DataFrame", {
+test_that("read/write json files", {
+ # Test read.df
+ df <- read.df(sqlContext, jsonPath, "json")
+ expect_is(df, "DataFrame")
+ expect_equal(count(df), 3)
+
+ # Test read.df with a user defined schema
+ schema <- structType(structField("name", type = "string"),
+ structField("age", type = "double"))
+
+ df1 <- read.df(sqlContext, jsonPath, "json", schema)
+ expect_is(df1, "DataFrame")
+ expect_equal(dtypes(df1), list(c("name", "string"), c("age", "double")))
+
+ # Test loadDF
+ df2 <- loadDF(sqlContext, jsonPath, "json", schema)
+ expect_is(df2, "DataFrame")
+ expect_equal(dtypes(df2), list(c("name", "string"), c("age", "double")))
+
+ # Test read.json
df <- read.json(sqlContext, jsonPath)
expect_is(df, "DataFrame")
expect_equal(count(df), 3)
- # read.json()/jsonFile() works with multiple input paths
+
+ # Test write.df
jsonPath2 <- tempfile(pattern="jsonPath2", fileext=".json")
write.df(df, jsonPath2, "json", mode="overwrite")
- jsonDF1 <- read.json(sqlContext, c(jsonPath, jsonPath2))
+
+ # Test write.json
+ jsonPath3 <- tempfile(pattern="jsonPath3", fileext=".json")
+ write.json(df, jsonPath3)
+
+ # Test read.json()/jsonFile() works with multiple input paths
+ jsonDF1 <- read.json(sqlContext, c(jsonPath2, jsonPath3))
expect_is(jsonDF1, "DataFrame")
expect_equal(count(jsonDF1), 6)
# Suppress warnings because jsonFile is deprecated
- jsonDF2 <- suppressWarnings(jsonFile(sqlContext, c(jsonPath, jsonPath2)))
+ jsonDF2 <- suppressWarnings(jsonFile(sqlContext, c(jsonPath2, jsonPath3)))
expect_is(jsonDF2, "DataFrame")
expect_equal(count(jsonDF2), 6)
unlink(jsonPath2)
+ unlink(jsonPath3)
})
test_that("jsonRDD() on a RDD with json string", {
@@ -454,6 +481,9 @@ test_that("insertInto() on a registered table", {
expect_equal(count(sql(sqlContext, "select * from table1")), 2)
expect_equal(first(sql(sqlContext, "select * from table1 order by age"))$name, "Bob")
dropTempTable(sqlContext, "table1")
+
+ unlink(jsonPath2)
+ unlink(parquetPath2)
})
test_that("table() returns a new DataFrame", {
@@ -848,33 +878,6 @@ test_that("column calculation", {
expect_equal(count(df2), 3)
})
-test_that("read.df() from json file", {
- df <- read.df(sqlContext, jsonPath, "json")
- expect_is(df, "DataFrame")
- expect_equal(count(df), 3)
-
- # Check if we can apply a user defined schema
- schema <- structType(structField("name", type = "string"),
- structField("age", type = "double"))
-
- df1 <- read.df(sqlContext, jsonPath, "json", schema)
- expect_is(df1, "DataFrame")
- expect_equal(dtypes(df1), list(c("name", "string"), c("age", "double")))
-
- # Run the same with loadDF
- df2 <- loadDF(sqlContext, jsonPath, "json", schema)
- expect_is(df2, "DataFrame")
- expect_equal(dtypes(df2), list(c("name", "string"), c("age", "double")))
-})
-
-test_that("write.df() as parquet file", {
- df <- read.df(sqlContext, jsonPath, "json")
- write.df(df, parquetPath, "parquet", mode="overwrite")
- df2 <- read.df(sqlContext, parquetPath, "parquet")
- expect_is(df2, "DataFrame")
- expect_equal(count(df2), 3)
-})
-
test_that("test HiveContext", {
ssc <- callJMethod(sc, "sc")
hiveCtx <- tryCatch({
@@ -895,6 +898,8 @@ test_that("test HiveContext", {
df3 <- sql(hiveCtx, "select * from json2")
expect_is(df3, "DataFrame")
expect_equal(count(df3), 3)
+
+ unlink(jsonPath2)
})
test_that("column operators", {
@@ -1333,6 +1338,9 @@ test_that("join() and merge() on a DataFrame", {
expect_error(merge(df, df3),
paste("The following column name: name_y occurs more than once in the 'DataFrame'.",
"Please use different suffixes for the intersected columns.", sep = ""))
+
+ unlink(jsonPath2)
+ unlink(jsonPath3)
})
test_that("toJSON() returns an RDD of the correct values", {
@@ -1396,6 +1404,8 @@ test_that("unionAll(), rbind(), except(), and intersect() on a DataFrame", {
# Test base::intersect is working
expect_equal(length(intersect(1:20, 3:23)), 18)
+
+ unlink(jsonPath2)
})
test_that("withColumn() and withColumnRenamed()", {
@@ -1440,31 +1450,35 @@ test_that("mutate(), transform(), rename() and names()", {
detach(airquality)
})
-test_that("write.df() on DataFrame and works with read.parquet", {
- df <- read.json(sqlContext, jsonPath)
+test_that("read/write Parquet files", {
+ df <- read.df(sqlContext, jsonPath, "json")
+ # Test write.df and read.df
write.df(df, parquetPath, "parquet", mode="overwrite")
- parquetDF <- read.parquet(sqlContext, parquetPath)
- expect_is(parquetDF, "DataFrame")
- expect_equal(count(df), count(parquetDF))
-})
+ df2 <- read.df(sqlContext, parquetPath, "parquet")
+ expect_is(df2, "DataFrame")
+ expect_equal(count(df2), 3)
-test_that("read.parquet()/parquetFile() works with multiple input paths", {
- df <- read.json(sqlContext, jsonPath)
- write.df(df, parquetPath, "parquet", mode="overwrite")
+ # Test write.parquet/saveAsParquetFile and read.parquet/parquetFile
parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
- write.df(df, parquetPath2, "parquet", mode="overwrite")
- parquetDF <- read.parquet(sqlContext, c(parquetPath, parquetPath2))
+ write.parquet(df, parquetPath2)
+ parquetPath3 <- tempfile(pattern = "parquetPath3", fileext = ".parquet")
+ suppressWarnings(saveAsParquetFile(df, parquetPath3))
+ parquetDF <- read.parquet(sqlContext, c(parquetPath2, parquetPath3))
expect_is(parquetDF, "DataFrame")
expect_equal(count(parquetDF), count(df) * 2)
- parquetDF2 <- suppressWarnings(parquetFile(sqlContext, parquetPath, parquetPath2))
+ parquetDF2 <- suppressWarnings(parquetFile(sqlContext, parquetPath2, parquetPath3))
expect_is(parquetDF2, "DataFrame")
expect_equal(count(parquetDF2), count(df) * 2)
# Test if varargs works with variables
saveMode <- "overwrite"
mergeSchema <- "true"
- parquetPath3 <- tempfile(pattern = "parquetPath3", fileext = ".parquet")
- write.df(df, parquetPath2, "parquet", mode = saveMode, mergeSchema = mergeSchema)
+ parquetPath4 <- tempfile(pattern = "parquetPath3", fileext = ".parquet")
+ write.df(df, parquetPath3, "parquet", mode = saveMode, mergeSchema = mergeSchema)
+
+ unlink(parquetPath2)
+ unlink(parquetPath3)
+ unlink(parquetPath4)
})
test_that("describe() and summarize() on a DataFrame", {