diff options
author | Yanbo Liang <ybliang8@gmail.com> | 2015-12-16 10:34:30 -0800 |
---|---|---|
committer | Shivaram Venkataraman <shivaram@cs.berkeley.edu> | 2015-12-16 10:34:30 -0800 |
commit | 22f6cd86fc2e2d6f6ad2c3aae416732c46ebf1b1 (patch) | |
tree | cc8e6e2e75599647daa850ae773f151b4753091e /R/pkg/inst/tests | |
parent | 2eb5af5f0d3c424dc617bb1a18dd0210ea9ba0bc (diff) | |
download | spark-22f6cd86fc2e2d6f6ad2c3aae416732c46ebf1b1.tar.gz spark-22f6cd86fc2e2d6f6ad2c3aae416732c46ebf1b1.tar.bz2 spark-22f6cd86fc2e2d6f6ad2c3aae416732c46ebf1b1.zip |
[SPARK-12310][SPARKR] Add write.json and write.parquet for SparkR
Add ```write.json``` and ```write.parquet``` for SparkR, and deprecated ```saveAsParquetFile```.
Author: Yanbo Liang <ybliang8@gmail.com>
Closes #10281 from yanboliang/spark-12310.
Diffstat (limited to 'R/pkg/inst/tests')
-rw-r--r-- | R/pkg/inst/tests/testthat/test_sparkSQL.R | 104 |
1 files changed, 59 insertions, 45 deletions
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 071fd310fd..135c7576e5 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -371,22 +371,49 @@ test_that("Collect DataFrame with complex types", { expect_equal(bob$height, 176.5) }) -test_that("read.json()/jsonFile() on a local file returns a DataFrame", { +test_that("read/write json files", { + # Test read.df + df <- read.df(sqlContext, jsonPath, "json") + expect_is(df, "DataFrame") + expect_equal(count(df), 3) + + # Test read.df with a user defined schema + schema <- structType(structField("name", type = "string"), + structField("age", type = "double")) + + df1 <- read.df(sqlContext, jsonPath, "json", schema) + expect_is(df1, "DataFrame") + expect_equal(dtypes(df1), list(c("name", "string"), c("age", "double"))) + + # Test loadDF + df2 <- loadDF(sqlContext, jsonPath, "json", schema) + expect_is(df2, "DataFrame") + expect_equal(dtypes(df2), list(c("name", "string"), c("age", "double"))) + + # Test read.json df <- read.json(sqlContext, jsonPath) expect_is(df, "DataFrame") expect_equal(count(df), 3) - # read.json()/jsonFile() works with multiple input paths + + # Test write.df jsonPath2 <- tempfile(pattern="jsonPath2", fileext=".json") write.df(df, jsonPath2, "json", mode="overwrite") - jsonDF1 <- read.json(sqlContext, c(jsonPath, jsonPath2)) + + # Test write.json + jsonPath3 <- tempfile(pattern="jsonPath3", fileext=".json") + write.json(df, jsonPath3) + + # Test read.json()/jsonFile() works with multiple input paths + jsonDF1 <- read.json(sqlContext, c(jsonPath2, jsonPath3)) expect_is(jsonDF1, "DataFrame") expect_equal(count(jsonDF1), 6) # Suppress warnings because jsonFile is deprecated - jsonDF2 <- suppressWarnings(jsonFile(sqlContext, c(jsonPath, jsonPath2))) + jsonDF2 <- suppressWarnings(jsonFile(sqlContext, c(jsonPath2, jsonPath3))) expect_is(jsonDF2, "DataFrame") expect_equal(count(jsonDF2), 6) unlink(jsonPath2) + unlink(jsonPath3) }) test_that("jsonRDD() on a RDD with json string", { @@ -454,6 +481,9 @@ test_that("insertInto() on a registered table", { expect_equal(count(sql(sqlContext, "select * from table1")), 2) expect_equal(first(sql(sqlContext, "select * from table1 order by age"))$name, "Bob") dropTempTable(sqlContext, "table1") + + unlink(jsonPath2) + unlink(parquetPath2) }) test_that("table() returns a new DataFrame", { @@ -848,33 +878,6 @@ test_that("column calculation", { expect_equal(count(df2), 3) }) -test_that("read.df() from json file", { - df <- read.df(sqlContext, jsonPath, "json") - expect_is(df, "DataFrame") - expect_equal(count(df), 3) - - # Check if we can apply a user defined schema - schema <- structType(structField("name", type = "string"), - structField("age", type = "double")) - - df1 <- read.df(sqlContext, jsonPath, "json", schema) - expect_is(df1, "DataFrame") - expect_equal(dtypes(df1), list(c("name", "string"), c("age", "double"))) - - # Run the same with loadDF - df2 <- loadDF(sqlContext, jsonPath, "json", schema) - expect_is(df2, "DataFrame") - expect_equal(dtypes(df2), list(c("name", "string"), c("age", "double"))) -}) - -test_that("write.df() as parquet file", { - df <- read.df(sqlContext, jsonPath, "json") - write.df(df, parquetPath, "parquet", mode="overwrite") - df2 <- read.df(sqlContext, parquetPath, "parquet") - expect_is(df2, "DataFrame") - expect_equal(count(df2), 3) -}) - test_that("test HiveContext", { ssc <- callJMethod(sc, "sc") hiveCtx <- tryCatch({ @@ -895,6 +898,8 @@ test_that("test HiveContext", { df3 <- sql(hiveCtx, "select * from json2") expect_is(df3, "DataFrame") expect_equal(count(df3), 3) + + unlink(jsonPath2) }) test_that("column operators", { @@ -1333,6 +1338,9 @@ test_that("join() and merge() on a DataFrame", { expect_error(merge(df, df3), paste("The following column name: name_y occurs more than once in the 'DataFrame'.", "Please use different suffixes for the intersected columns.", sep = "")) + + unlink(jsonPath2) + unlink(jsonPath3) }) test_that("toJSON() returns an RDD of the correct values", { @@ -1396,6 +1404,8 @@ test_that("unionAll(), rbind(), except(), and intersect() on a DataFrame", { # Test base::intersect is working expect_equal(length(intersect(1:20, 3:23)), 18) + + unlink(jsonPath2) }) test_that("withColumn() and withColumnRenamed()", { @@ -1440,31 +1450,35 @@ test_that("mutate(), transform(), rename() and names()", { detach(airquality) }) -test_that("write.df() on DataFrame and works with read.parquet", { - df <- read.json(sqlContext, jsonPath) +test_that("read/write Parquet files", { + df <- read.df(sqlContext, jsonPath, "json") + # Test write.df and read.df write.df(df, parquetPath, "parquet", mode="overwrite") - parquetDF <- read.parquet(sqlContext, parquetPath) - expect_is(parquetDF, "DataFrame") - expect_equal(count(df), count(parquetDF)) -}) + df2 <- read.df(sqlContext, parquetPath, "parquet") + expect_is(df2, "DataFrame") + expect_equal(count(df2), 3) -test_that("read.parquet()/parquetFile() works with multiple input paths", { - df <- read.json(sqlContext, jsonPath) - write.df(df, parquetPath, "parquet", mode="overwrite") + # Test write.parquet/saveAsParquetFile and read.parquet/parquetFile parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet") - write.df(df, parquetPath2, "parquet", mode="overwrite") - parquetDF <- read.parquet(sqlContext, c(parquetPath, parquetPath2)) + write.parquet(df, parquetPath2) + parquetPath3 <- tempfile(pattern = "parquetPath3", fileext = ".parquet") + suppressWarnings(saveAsParquetFile(df, parquetPath3)) + parquetDF <- read.parquet(sqlContext, c(parquetPath2, parquetPath3)) expect_is(parquetDF, "DataFrame") expect_equal(count(parquetDF), count(df) * 2) - parquetDF2 <- suppressWarnings(parquetFile(sqlContext, parquetPath, parquetPath2)) + parquetDF2 <- suppressWarnings(parquetFile(sqlContext, parquetPath2, parquetPath3)) expect_is(parquetDF2, "DataFrame") expect_equal(count(parquetDF2), count(df) * 2) # Test if varargs works with variables saveMode <- "overwrite" mergeSchema <- "true" - parquetPath3 <- tempfile(pattern = "parquetPath3", fileext = ".parquet") - write.df(df, parquetPath2, "parquet", mode = saveMode, mergeSchema = mergeSchema) + parquetPath4 <- tempfile(pattern = "parquetPath3", fileext = ".parquet") + write.df(df, parquetPath3, "parquet", mode = saveMode, mergeSchema = mergeSchema) + + unlink(parquetPath2) + unlink(parquetPath3) + unlink(parquetPath4) }) test_that("describe() and summarize() on a DataFrame", { |