aboutsummaryrefslogtreecommitdiff
path: root/R/pkg/inst/tests
diff options
context:
space:
mode:
authorYanbo Liang <ybliang8@gmail.com>2015-12-16 10:34:30 -0800
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2015-12-16 10:34:30 -0800
commit22f6cd86fc2e2d6f6ad2c3aae416732c46ebf1b1 (patch)
treecc8e6e2e75599647daa850ae773f151b4753091e /R/pkg/inst/tests
parent2eb5af5f0d3c424dc617bb1a18dd0210ea9ba0bc (diff)
downloadspark-22f6cd86fc2e2d6f6ad2c3aae416732c46ebf1b1.tar.gz
spark-22f6cd86fc2e2d6f6ad2c3aae416732c46ebf1b1.tar.bz2
spark-22f6cd86fc2e2d6f6ad2c3aae416732c46ebf1b1.zip
[SPARK-12310][SPARKR] Add write.json and write.parquet for SparkR
Add ```write.json``` and ```write.parquet``` for SparkR, and deprecated ```saveAsParquetFile```. Author: Yanbo Liang <ybliang8@gmail.com> Closes #10281 from yanboliang/spark-12310.
Diffstat (limited to 'R/pkg/inst/tests')
-rw-r--r--R/pkg/inst/tests/testthat/test_sparkSQL.R104
1 files changed, 59 insertions, 45 deletions
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 071fd310fd..135c7576e5 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -371,22 +371,49 @@ test_that("Collect DataFrame with complex types", {
expect_equal(bob$height, 176.5)
})
-test_that("read.json()/jsonFile() on a local file returns a DataFrame", {
+test_that("read/write json files", {
+ # Test read.df
+ df <- read.df(sqlContext, jsonPath, "json")
+ expect_is(df, "DataFrame")
+ expect_equal(count(df), 3)
+
+ # Test read.df with a user defined schema
+ schema <- structType(structField("name", type = "string"),
+ structField("age", type = "double"))
+
+ df1 <- read.df(sqlContext, jsonPath, "json", schema)
+ expect_is(df1, "DataFrame")
+ expect_equal(dtypes(df1), list(c("name", "string"), c("age", "double")))
+
+ # Test loadDF
+ df2 <- loadDF(sqlContext, jsonPath, "json", schema)
+ expect_is(df2, "DataFrame")
+ expect_equal(dtypes(df2), list(c("name", "string"), c("age", "double")))
+
+ # Test read.json
df <- read.json(sqlContext, jsonPath)
expect_is(df, "DataFrame")
expect_equal(count(df), 3)
- # read.json()/jsonFile() works with multiple input paths
+
+ # Test write.df
jsonPath2 <- tempfile(pattern="jsonPath2", fileext=".json")
write.df(df, jsonPath2, "json", mode="overwrite")
- jsonDF1 <- read.json(sqlContext, c(jsonPath, jsonPath2))
+
+ # Test write.json
+ jsonPath3 <- tempfile(pattern="jsonPath3", fileext=".json")
+ write.json(df, jsonPath3)
+
+ # Test read.json()/jsonFile() works with multiple input paths
+ jsonDF1 <- read.json(sqlContext, c(jsonPath2, jsonPath3))
expect_is(jsonDF1, "DataFrame")
expect_equal(count(jsonDF1), 6)
# Suppress warnings because jsonFile is deprecated
- jsonDF2 <- suppressWarnings(jsonFile(sqlContext, c(jsonPath, jsonPath2)))
+ jsonDF2 <- suppressWarnings(jsonFile(sqlContext, c(jsonPath2, jsonPath3)))
expect_is(jsonDF2, "DataFrame")
expect_equal(count(jsonDF2), 6)
unlink(jsonPath2)
+ unlink(jsonPath3)
})
test_that("jsonRDD() on a RDD with json string", {
@@ -454,6 +481,9 @@ test_that("insertInto() on a registered table", {
expect_equal(count(sql(sqlContext, "select * from table1")), 2)
expect_equal(first(sql(sqlContext, "select * from table1 order by age"))$name, "Bob")
dropTempTable(sqlContext, "table1")
+
+ unlink(jsonPath2)
+ unlink(parquetPath2)
})
test_that("table() returns a new DataFrame", {
@@ -848,33 +878,6 @@ test_that("column calculation", {
expect_equal(count(df2), 3)
})
-test_that("read.df() from json file", {
- df <- read.df(sqlContext, jsonPath, "json")
- expect_is(df, "DataFrame")
- expect_equal(count(df), 3)
-
- # Check if we can apply a user defined schema
- schema <- structType(structField("name", type = "string"),
- structField("age", type = "double"))
-
- df1 <- read.df(sqlContext, jsonPath, "json", schema)
- expect_is(df1, "DataFrame")
- expect_equal(dtypes(df1), list(c("name", "string"), c("age", "double")))
-
- # Run the same with loadDF
- df2 <- loadDF(sqlContext, jsonPath, "json", schema)
- expect_is(df2, "DataFrame")
- expect_equal(dtypes(df2), list(c("name", "string"), c("age", "double")))
-})
-
-test_that("write.df() as parquet file", {
- df <- read.df(sqlContext, jsonPath, "json")
- write.df(df, parquetPath, "parquet", mode="overwrite")
- df2 <- read.df(sqlContext, parquetPath, "parquet")
- expect_is(df2, "DataFrame")
- expect_equal(count(df2), 3)
-})
-
test_that("test HiveContext", {
ssc <- callJMethod(sc, "sc")
hiveCtx <- tryCatch({
@@ -895,6 +898,8 @@ test_that("test HiveContext", {
df3 <- sql(hiveCtx, "select * from json2")
expect_is(df3, "DataFrame")
expect_equal(count(df3), 3)
+
+ unlink(jsonPath2)
})
test_that("column operators", {
@@ -1333,6 +1338,9 @@ test_that("join() and merge() on a DataFrame", {
expect_error(merge(df, df3),
paste("The following column name: name_y occurs more than once in the 'DataFrame'.",
"Please use different suffixes for the intersected columns.", sep = ""))
+
+ unlink(jsonPath2)
+ unlink(jsonPath3)
})
test_that("toJSON() returns an RDD of the correct values", {
@@ -1396,6 +1404,8 @@ test_that("unionAll(), rbind(), except(), and intersect() on a DataFrame", {
# Test base::intersect is working
expect_equal(length(intersect(1:20, 3:23)), 18)
+
+ unlink(jsonPath2)
})
test_that("withColumn() and withColumnRenamed()", {
@@ -1440,31 +1450,35 @@ test_that("mutate(), transform(), rename() and names()", {
detach(airquality)
})
-test_that("write.df() on DataFrame and works with read.parquet", {
- df <- read.json(sqlContext, jsonPath)
+test_that("read/write Parquet files", {
+ df <- read.df(sqlContext, jsonPath, "json")
+ # Test write.df and read.df
write.df(df, parquetPath, "parquet", mode="overwrite")
- parquetDF <- read.parquet(sqlContext, parquetPath)
- expect_is(parquetDF, "DataFrame")
- expect_equal(count(df), count(parquetDF))
-})
+ df2 <- read.df(sqlContext, parquetPath, "parquet")
+ expect_is(df2, "DataFrame")
+ expect_equal(count(df2), 3)
-test_that("read.parquet()/parquetFile() works with multiple input paths", {
- df <- read.json(sqlContext, jsonPath)
- write.df(df, parquetPath, "parquet", mode="overwrite")
+ # Test write.parquet/saveAsParquetFile and read.parquet/parquetFile
parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
- write.df(df, parquetPath2, "parquet", mode="overwrite")
- parquetDF <- read.parquet(sqlContext, c(parquetPath, parquetPath2))
+ write.parquet(df, parquetPath2)
+ parquetPath3 <- tempfile(pattern = "parquetPath3", fileext = ".parquet")
+ suppressWarnings(saveAsParquetFile(df, parquetPath3))
+ parquetDF <- read.parquet(sqlContext, c(parquetPath2, parquetPath3))
expect_is(parquetDF, "DataFrame")
expect_equal(count(parquetDF), count(df) * 2)
- parquetDF2 <- suppressWarnings(parquetFile(sqlContext, parquetPath, parquetPath2))
+ parquetDF2 <- suppressWarnings(parquetFile(sqlContext, parquetPath2, parquetPath3))
expect_is(parquetDF2, "DataFrame")
expect_equal(count(parquetDF2), count(df) * 2)
# Test if varargs works with variables
saveMode <- "overwrite"
mergeSchema <- "true"
- parquetPath3 <- tempfile(pattern = "parquetPath3", fileext = ".parquet")
- write.df(df, parquetPath2, "parquet", mode = saveMode, mergeSchema = mergeSchema)
+ parquetPath4 <- tempfile(pattern = "parquetPath3", fileext = ".parquet")
+ write.df(df, parquetPath3, "parquet", mode = saveMode, mergeSchema = mergeSchema)
+
+ unlink(parquetPath2)
+ unlink(parquetPath3)
+ unlink(parquetPath4)
})
test_that("describe() and summarize() on a DataFrame", {