From 22f6cd86fc2e2d6f6ad2c3aae416732c46ebf1b1 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Wed, 16 Dec 2015 10:34:30 -0800 Subject: [SPARK-12310][SPARKR] Add write.json and write.parquet for SparkR Add ```write.json``` and ```write.parquet``` for SparkR, and deprecated ```saveAsParquetFile```. Author: Yanbo Liang Closes #10281 from yanboliang/spark-12310. --- R/pkg/NAMESPACE | 4 +- R/pkg/R/DataFrame.R | 51 +++++++++++++-- R/pkg/R/generics.R | 16 +++-- R/pkg/inst/tests/testthat/test_sparkSQL.R | 104 +++++++++++++++++------------- 4 files changed, 119 insertions(+), 56 deletions(-) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index cab39d68c3..ccc01fe169 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -92,7 +92,9 @@ exportMethods("arrange", "with", "withColumn", "withColumnRenamed", - "write.df") + "write.df", + "write.json", + "write.parquet") exportClasses("Column") diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 380a13fe2b..0cfa12b997 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -596,17 +596,44 @@ setMethod("toJSON", RDD(jrdd, serializedMode = "string") }) -#' saveAsParquetFile +#' write.json +#' +#' Save the contents of a DataFrame as a JSON file (one object per line). Files written out +#' with this method can be read back in as a DataFrame using read.json(). +#' +#' @param x A SparkSQL DataFrame +#' @param path The directory where the file is saved +#' +#' @family DataFrame functions +#' @rdname write.json +#' @name write.json +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' sqlContext <- sparkRSQL.init(sc) +#' path <- "path/to/file.json" +#' df <- read.json(sqlContext, path) +#' write.json(df, "/tmp/sparkr-tmp/") +#'} +setMethod("write.json", + signature(x = "DataFrame", path = "character"), + function(x, path) { + write <- callJMethod(x@sdf, "write") + invisible(callJMethod(write, "json", path)) + }) + +#' write.parquet #' #' Save the contents of a DataFrame as a Parquet file, preserving the schema. Files written out -#' with this method can be read back in as a DataFrame using parquetFile(). +#' with this method can be read back in as a DataFrame using read.parquet(). #' #' @param x A SparkSQL DataFrame #' @param path The directory where the file is saved #' #' @family DataFrame functions -#' @rdname saveAsParquetFile -#' @name saveAsParquetFile +#' @rdname write.parquet +#' @name write.parquet #' @export #' @examples #'\dontrun{ @@ -614,12 +641,24 @@ setMethod("toJSON", #' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" #' df <- read.json(sqlContext, path) -#' saveAsParquetFile(df, "/tmp/sparkr-tmp/") +#' write.parquet(df, "/tmp/sparkr-tmp1/") +#' saveAsParquetFile(df, "/tmp/sparkr-tmp2/") #'} +setMethod("write.parquet", + signature(x = "DataFrame", path = "character"), + function(x, path) { + write <- callJMethod(x@sdf, "write") + invisible(callJMethod(write, "parquet", path)) + }) + +#' @rdname write.parquet +#' @name saveAsParquetFile +#' @export setMethod("saveAsParquetFile", signature(x = "DataFrame", path = "character"), function(x, path) { - invisible(callJMethod(x@sdf, "saveAsParquetFile", path)) + .Deprecated("write.parquet") + write.parquet(x, path) }) #' Distinct diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index c383e6e78b..62be2ddc8f 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -519,10 +519,6 @@ setGeneric("sample_frac", #' @export setGeneric("sampleBy", function(x, col, fractions, seed) { standardGeneric("sampleBy") }) -#' @rdname saveAsParquetFile -#' @export -setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") }) - #' @rdname saveAsTable #' @export setGeneric("saveAsTable", function(df, tableName, source, mode, ...) { @@ -541,6 +537,18 @@ setGeneric("write.df", function(df, path, ...) { standardGeneric("write.df") }) #' @export setGeneric("saveDF", function(df, path, ...) { standardGeneric("saveDF") }) +#' @rdname write.json +#' @export +setGeneric("write.json", function(x, path) { standardGeneric("write.json") }) + +#' @rdname write.parquet +#' @export +setGeneric("write.parquet", function(x, path) { standardGeneric("write.parquet") }) + +#' @rdname write.parquet +#' @export +setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") }) + #' @rdname schema #' @export setGeneric("schema", function(x) { standardGeneric("schema") }) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 071fd310fd..135c7576e5 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -371,22 +371,49 @@ test_that("Collect DataFrame with complex types", { expect_equal(bob$height, 176.5) }) -test_that("read.json()/jsonFile() on a local file returns a DataFrame", { +test_that("read/write json files", { + # Test read.df + df <- read.df(sqlContext, jsonPath, "json") + expect_is(df, "DataFrame") + expect_equal(count(df), 3) + + # Test read.df with a user defined schema + schema <- structType(structField("name", type = "string"), + structField("age", type = "double")) + + df1 <- read.df(sqlContext, jsonPath, "json", schema) + expect_is(df1, "DataFrame") + expect_equal(dtypes(df1), list(c("name", "string"), c("age", "double"))) + + # Test loadDF + df2 <- loadDF(sqlContext, jsonPath, "json", schema) + expect_is(df2, "DataFrame") + expect_equal(dtypes(df2), list(c("name", "string"), c("age", "double"))) + + # Test read.json df <- read.json(sqlContext, jsonPath) expect_is(df, "DataFrame") expect_equal(count(df), 3) - # read.json()/jsonFile() works with multiple input paths + + # Test write.df jsonPath2 <- tempfile(pattern="jsonPath2", fileext=".json") write.df(df, jsonPath2, "json", mode="overwrite") - jsonDF1 <- read.json(sqlContext, c(jsonPath, jsonPath2)) + + # Test write.json + jsonPath3 <- tempfile(pattern="jsonPath3", fileext=".json") + write.json(df, jsonPath3) + + # Test read.json()/jsonFile() works with multiple input paths + jsonDF1 <- read.json(sqlContext, c(jsonPath2, jsonPath3)) expect_is(jsonDF1, "DataFrame") expect_equal(count(jsonDF1), 6) # Suppress warnings because jsonFile is deprecated - jsonDF2 <- suppressWarnings(jsonFile(sqlContext, c(jsonPath, jsonPath2))) + jsonDF2 <- suppressWarnings(jsonFile(sqlContext, c(jsonPath2, jsonPath3))) expect_is(jsonDF2, "DataFrame") expect_equal(count(jsonDF2), 6) unlink(jsonPath2) + unlink(jsonPath3) }) test_that("jsonRDD() on a RDD with json string", { @@ -454,6 +481,9 @@ test_that("insertInto() on a registered table", { expect_equal(count(sql(sqlContext, "select * from table1")), 2) expect_equal(first(sql(sqlContext, "select * from table1 order by age"))$name, "Bob") dropTempTable(sqlContext, "table1") + + unlink(jsonPath2) + unlink(parquetPath2) }) test_that("table() returns a new DataFrame", { @@ -848,33 +878,6 @@ test_that("column calculation", { expect_equal(count(df2), 3) }) -test_that("read.df() from json file", { - df <- read.df(sqlContext, jsonPath, "json") - expect_is(df, "DataFrame") - expect_equal(count(df), 3) - - # Check if we can apply a user defined schema - schema <- structType(structField("name", type = "string"), - structField("age", type = "double")) - - df1 <- read.df(sqlContext, jsonPath, "json", schema) - expect_is(df1, "DataFrame") - expect_equal(dtypes(df1), list(c("name", "string"), c("age", "double"))) - - # Run the same with loadDF - df2 <- loadDF(sqlContext, jsonPath, "json", schema) - expect_is(df2, "DataFrame") - expect_equal(dtypes(df2), list(c("name", "string"), c("age", "double"))) -}) - -test_that("write.df() as parquet file", { - df <- read.df(sqlContext, jsonPath, "json") - write.df(df, parquetPath, "parquet", mode="overwrite") - df2 <- read.df(sqlContext, parquetPath, "parquet") - expect_is(df2, "DataFrame") - expect_equal(count(df2), 3) -}) - test_that("test HiveContext", { ssc <- callJMethod(sc, "sc") hiveCtx <- tryCatch({ @@ -895,6 +898,8 @@ test_that("test HiveContext", { df3 <- sql(hiveCtx, "select * from json2") expect_is(df3, "DataFrame") expect_equal(count(df3), 3) + + unlink(jsonPath2) }) test_that("column operators", { @@ -1333,6 +1338,9 @@ test_that("join() and merge() on a DataFrame", { expect_error(merge(df, df3), paste("The following column name: name_y occurs more than once in the 'DataFrame'.", "Please use different suffixes for the intersected columns.", sep = "")) + + unlink(jsonPath2) + unlink(jsonPath3) }) test_that("toJSON() returns an RDD of the correct values", { @@ -1396,6 +1404,8 @@ test_that("unionAll(), rbind(), except(), and intersect() on a DataFrame", { # Test base::intersect is working expect_equal(length(intersect(1:20, 3:23)), 18) + + unlink(jsonPath2) }) test_that("withColumn() and withColumnRenamed()", { @@ -1440,31 +1450,35 @@ test_that("mutate(), transform(), rename() and names()", { detach(airquality) }) -test_that("write.df() on DataFrame and works with read.parquet", { - df <- read.json(sqlContext, jsonPath) +test_that("read/write Parquet files", { + df <- read.df(sqlContext, jsonPath, "json") + # Test write.df and read.df write.df(df, parquetPath, "parquet", mode="overwrite") - parquetDF <- read.parquet(sqlContext, parquetPath) - expect_is(parquetDF, "DataFrame") - expect_equal(count(df), count(parquetDF)) -}) + df2 <- read.df(sqlContext, parquetPath, "parquet") + expect_is(df2, "DataFrame") + expect_equal(count(df2), 3) -test_that("read.parquet()/parquetFile() works with multiple input paths", { - df <- read.json(sqlContext, jsonPath) - write.df(df, parquetPath, "parquet", mode="overwrite") + # Test write.parquet/saveAsParquetFile and read.parquet/parquetFile parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet") - write.df(df, parquetPath2, "parquet", mode="overwrite") - parquetDF <- read.parquet(sqlContext, c(parquetPath, parquetPath2)) + write.parquet(df, parquetPath2) + parquetPath3 <- tempfile(pattern = "parquetPath3", fileext = ".parquet") + suppressWarnings(saveAsParquetFile(df, parquetPath3)) + parquetDF <- read.parquet(sqlContext, c(parquetPath2, parquetPath3)) expect_is(parquetDF, "DataFrame") expect_equal(count(parquetDF), count(df) * 2) - parquetDF2 <- suppressWarnings(parquetFile(sqlContext, parquetPath, parquetPath2)) + parquetDF2 <- suppressWarnings(parquetFile(sqlContext, parquetPath2, parquetPath3)) expect_is(parquetDF2, "DataFrame") expect_equal(count(parquetDF2), count(df) * 2) # Test if varargs works with variables saveMode <- "overwrite" mergeSchema <- "true" - parquetPath3 <- tempfile(pattern = "parquetPath3", fileext = ".parquet") - write.df(df, parquetPath2, "parquet", mode = saveMode, mergeSchema = mergeSchema) + parquetPath4 <- tempfile(pattern = "parquetPath3", fileext = ".parquet") + write.df(df, parquetPath3, "parquet", mode = saveMode, mergeSchema = mergeSchema) + + unlink(parquetPath2) + unlink(parquetPath3) + unlink(parquetPath4) }) test_that("describe() and summarize() on a DataFrame", { -- cgit v1.2.3