aboutsummaryrefslogtreecommitdiff
path: root/R/pkg/inst
diff options
context:
space:
mode:
authorYanbo Liang <ybliang8@gmail.com>2015-12-11 11:47:35 -0800
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2015-12-11 11:47:35 -0800
commit0fb9825556dbbcc98d7eafe9ddea8676301e09bb (patch)
treef89813a20b893dbf65c019e4a031a7d3e189c135 /R/pkg/inst
parentc119a34d1e9e599e302acfda92e5de681086a19f (diff)
downloadspark-0fb9825556dbbcc98d7eafe9ddea8676301e09bb.tar.gz
spark-0fb9825556dbbcc98d7eafe9ddea8676301e09bb.tar.bz2
spark-0fb9825556dbbcc98d7eafe9ddea8676301e09bb.zip
[SPARK-12146][SPARKR] SparkR jsonFile should support multiple input files
* ```jsonFile``` should support multiple input files, such as: ```R jsonFile(sqlContext, c(“path1”, “path2”)) # character vector as arguments jsonFile(sqlContext, “path1,path2”) ``` * Meanwhile, ```jsonFile``` has been deprecated by Spark SQL and will be removed at Spark 2.0. So we mark ```jsonFile``` deprecated and use ```read.json``` at SparkR side. * Replace all ```jsonFile``` with ```read.json``` at test_sparkSQL.R, but still keep jsonFile test case. * If this PR is accepted, we should also make almost the same change for ```parquetFile```. cc felixcheung sun-rui shivaram Author: Yanbo Liang <ybliang8@gmail.com> Closes #10145 from yanboliang/spark-12146.
Diffstat (limited to 'R/pkg/inst')
-rw-r--r--R/pkg/inst/tests/testthat/test_sparkSQL.R120
1 files changed, 66 insertions, 54 deletions
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 2051784427..ed9b2c9d4d 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -330,7 +330,7 @@ writeLines(mockLinesMapType, mapTypeJsonPath)
test_that("Collect DataFrame with complex types", {
# ArrayType
- df <- jsonFile(sqlContext, complexTypeJsonPath)
+ df <- read.json(sqlContext, complexTypeJsonPath)
ldf <- collect(df)
expect_equal(nrow(ldf), 3)
@@ -357,7 +357,7 @@ test_that("Collect DataFrame with complex types", {
expect_equal(bob$height, 176.5)
# StructType
- df <- jsonFile(sqlContext, mapTypeJsonPath)
+ df <- read.json(sqlContext, mapTypeJsonPath)
expect_equal(dtypes(df), list(c("info", "struct<age:bigint,height:double>"),
c("name", "string")))
ldf <- collect(df)
@@ -371,10 +371,22 @@ test_that("Collect DataFrame with complex types", {
expect_equal(bob$height, 176.5)
})
-test_that("jsonFile() on a local file returns a DataFrame", {
- df <- jsonFile(sqlContext, jsonPath)
+test_that("read.json()/jsonFile() on a local file returns a DataFrame", {
+ df <- read.json(sqlContext, jsonPath)
expect_is(df, "DataFrame")
expect_equal(count(df), 3)
+ # read.json()/jsonFile() works with multiple input paths
+ jsonPath2 <- tempfile(pattern="jsonPath2", fileext=".json")
+ write.df(df, jsonPath2, "json", mode="overwrite")
+ jsonDF1 <- read.json(sqlContext, c(jsonPath, jsonPath2))
+ expect_is(jsonDF1, "DataFrame")
+ expect_equal(count(jsonDF1), 6)
+ # Suppress warnings because jsonFile is deprecated
+ jsonDF2 <- suppressWarnings(jsonFile(sqlContext, c(jsonPath, jsonPath2)))
+ expect_is(jsonDF2, "DataFrame")
+ expect_equal(count(jsonDF2), 6)
+
+ unlink(jsonPath2)
})
test_that("jsonRDD() on a RDD with json string", {
@@ -391,7 +403,7 @@ test_that("jsonRDD() on a RDD with json string", {
})
test_that("test cache, uncache and clearCache", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
registerTempTable(df, "table1")
cacheTable(sqlContext, "table1")
uncacheTable(sqlContext, "table1")
@@ -400,7 +412,7 @@ test_that("test cache, uncache and clearCache", {
})
test_that("test tableNames and tables", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
registerTempTable(df, "table1")
expect_equal(length(tableNames(sqlContext)), 1)
df <- tables(sqlContext)
@@ -409,7 +421,7 @@ test_that("test tableNames and tables", {
})
test_that("registerTempTable() results in a queryable table and sql() results in a new DataFrame", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
registerTempTable(df, "table1")
newdf <- sql(sqlContext, "SELECT * FROM table1 where name = 'Michael'")
expect_is(newdf, "DataFrame")
@@ -445,7 +457,7 @@ test_that("insertInto() on a registered table", {
})
test_that("table() returns a new DataFrame", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
registerTempTable(df, "table1")
tabledf <- table(sqlContext, "table1")
expect_is(tabledf, "DataFrame")
@@ -458,14 +470,14 @@ test_that("table() returns a new DataFrame", {
})
test_that("toRDD() returns an RRDD", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
testRDD <- toRDD(df)
expect_is(testRDD, "RDD")
expect_equal(count(testRDD), 3)
})
test_that("union on two RDDs created from DataFrames returns an RRDD", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
RDD1 <- toRDD(df)
RDD2 <- toRDD(df)
unioned <- unionRDD(RDD1, RDD2)
@@ -487,7 +499,7 @@ test_that("union on mixed serialization types correctly returns a byte RRDD", {
writeLines(textLines, textPath)
textRDD <- textFile(sc, textPath)
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
dfRDD <- toRDD(df)
unionByte <- unionRDD(rdd, dfRDD)
@@ -505,7 +517,7 @@ test_that("union on mixed serialization types correctly returns a byte RRDD", {
test_that("objectFile() works with row serialization", {
objectPath <- tempfile(pattern="spark-test", fileext=".tmp")
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
dfRDD <- toRDD(df)
saveAsObjectFile(coalesce(dfRDD, 1L), objectPath)
objectIn <- objectFile(sc, objectPath)
@@ -516,7 +528,7 @@ test_that("objectFile() works with row serialization", {
})
test_that("lapply() on a DataFrame returns an RDD with the correct columns", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
testRDD <- lapply(df, function(row) {
row$newCol <- row$age + 5
row
@@ -528,7 +540,7 @@ test_that("lapply() on a DataFrame returns an RDD with the correct columns", {
})
test_that("collect() returns a data.frame", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
rdf <- collect(df)
expect_true(is.data.frame(rdf))
expect_equal(names(rdf)[1], "age")
@@ -550,14 +562,14 @@ test_that("collect() returns a data.frame", {
})
test_that("limit() returns DataFrame with the correct number of rows", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
dfLimited <- limit(df, 2)
expect_is(dfLimited, "DataFrame")
expect_equal(count(dfLimited), 2)
})
test_that("collect() and take() on a DataFrame return the same number of rows and columns", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
expect_equal(nrow(collect(df)), nrow(take(df, 10)))
expect_equal(ncol(collect(df)), ncol(take(df, 10)))
})
@@ -584,7 +596,7 @@ test_that("collect() support Unicode characters", {
})
test_that("multiple pipeline transformations result in an RDD with the correct values", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
first <- lapply(df, function(row) {
row$age <- row$age + 5
row
@@ -601,7 +613,7 @@ test_that("multiple pipeline transformations result in an RDD with the correct v
})
test_that("cache(), persist(), and unpersist() on a DataFrame", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
expect_false(df@env$isCached)
cache(df)
expect_true(df@env$isCached)
@@ -620,7 +632,7 @@ test_that("cache(), persist(), and unpersist() on a DataFrame", {
})
test_that("schema(), dtypes(), columns(), names() return the correct values/format", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
testSchema <- schema(df)
expect_equal(length(testSchema$fields()), 2)
expect_equal(testSchema$fields()[[1]]$dataType.toString(), "LongType")
@@ -641,7 +653,7 @@ test_that("schema(), dtypes(), columns(), names() return the correct values/form
})
test_that("names() colnames() set the column names", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
names(df) <- c("col1", "col2")
expect_equal(colnames(df)[2], "col2")
@@ -661,7 +673,7 @@ test_that("names() colnames() set the column names", {
})
test_that("head() and first() return the correct data", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
testHead <- head(df)
expect_equal(nrow(testHead), 3)
expect_equal(ncol(testHead), 2)
@@ -694,7 +706,7 @@ test_that("distinct() and unique on DataFrames", {
jsonPathWithDup <- tempfile(pattern="sparkr-test", fileext=".tmp")
writeLines(lines, jsonPathWithDup)
- df <- jsonFile(sqlContext, jsonPathWithDup)
+ df <- read.json(sqlContext, jsonPathWithDup)
uniques <- distinct(df)
expect_is(uniques, "DataFrame")
expect_equal(count(uniques), 3)
@@ -705,7 +717,7 @@ test_that("distinct() and unique on DataFrames", {
})
test_that("sample on a DataFrame", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
sampled <- sample(df, FALSE, 1.0)
expect_equal(nrow(collect(sampled)), count(df))
expect_is(sampled, "DataFrame")
@@ -721,7 +733,7 @@ test_that("sample on a DataFrame", {
})
test_that("select operators", {
- df <- select(jsonFile(sqlContext, jsonPath), "name", "age")
+ df <- select(read.json(sqlContext, jsonPath), "name", "age")
expect_is(df$name, "Column")
expect_is(df[[2]], "Column")
expect_is(df[["age"]], "Column")
@@ -747,7 +759,7 @@ test_that("select operators", {
})
test_that("select with column", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
df1 <- select(df, "name")
expect_equal(columns(df1), c("name"))
expect_equal(count(df1), 3)
@@ -770,8 +782,8 @@ test_that("select with column", {
})
test_that("subsetting", {
- # jsonFile returns columns in random order
- df <- select(jsonFile(sqlContext, jsonPath), "name", "age")
+ # read.json returns columns in random order
+ df <- select(read.json(sqlContext, jsonPath), "name", "age")
filtered <- df[df$age > 20,]
expect_equal(count(filtered), 1)
expect_equal(columns(filtered), c("name", "age"))
@@ -808,7 +820,7 @@ test_that("subsetting", {
})
test_that("selectExpr() on a DataFrame", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
selected <- selectExpr(df, "age * 2")
expect_equal(names(selected), "(age * 2)")
expect_equal(collect(selected), collect(select(df, df$age * 2L)))
@@ -819,12 +831,12 @@ test_that("selectExpr() on a DataFrame", {
})
test_that("expr() on a DataFrame", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
expect_equal(collect(select(df, expr("abs(-123)")))[1, 1], 123)
})
test_that("column calculation", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
d <- collect(select(df, alias(df$age + 1, "age2")))
expect_equal(names(d), c("age2"))
df2 <- select(df, lower(df$name), abs(df$age))
@@ -915,7 +927,7 @@ test_that("column functions", {
expect_equal(class(rank())[[1]], "Column")
expect_equal(rank(1:3), as.numeric(c(1:3)))
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
df2 <- select(df, between(df$age, c(20, 30)), between(df$age, c(10, 20)))
expect_equal(collect(df2)[[2, 1]], TRUE)
expect_equal(collect(df2)[[2, 2]], FALSE)
@@ -983,7 +995,7 @@ test_that("column binary mathfunctions", {
"{\"a\":4, \"b\":8}")
jsonPathWithDup <- tempfile(pattern="sparkr-test", fileext=".tmp")
writeLines(lines, jsonPathWithDup)
- df <- jsonFile(sqlContext, jsonPathWithDup)
+ df <- read.json(sqlContext, jsonPathWithDup)
expect_equal(collect(select(df, atan2(df$a, df$b)))[1, "ATAN2(a, b)"], atan2(1, 5))
expect_equal(collect(select(df, atan2(df$a, df$b)))[2, "ATAN2(a, b)"], atan2(2, 6))
expect_equal(collect(select(df, atan2(df$a, df$b)))[3, "ATAN2(a, b)"], atan2(3, 7))
@@ -1004,7 +1016,7 @@ test_that("column binary mathfunctions", {
})
test_that("string operators", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
expect_equal(count(where(df, like(df$name, "A%"))), 1)
expect_equal(count(where(df, startsWith(df$name, "A"))), 1)
expect_equal(first(select(df, substr(df$name, 1, 2)))[[1]], "Mi")
@@ -1100,7 +1112,7 @@ test_that("when(), otherwise() and ifelse() on a DataFrame", {
})
test_that("group by, agg functions", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
df1 <- agg(df, name = "max", age = "sum")
expect_equal(1, count(df1))
df1 <- agg(df, age2 = max(df$age))
@@ -1145,7 +1157,7 @@ test_that("group by, agg functions", {
"{\"name\":\"ID2\", \"value\": \"-3\"}")
jsonPath2 <- tempfile(pattern="sparkr-test", fileext=".tmp")
writeLines(mockLines2, jsonPath2)
- gd2 <- groupBy(jsonFile(sqlContext, jsonPath2), "name")
+ gd2 <- groupBy(read.json(sqlContext, jsonPath2), "name")
df6 <- agg(gd2, value = "sum")
df6_local <- collect(df6)
expect_equal(42, df6_local[df6_local$name == "ID1",][1, 2])
@@ -1162,7 +1174,7 @@ test_that("group by, agg functions", {
"{\"name\":\"Justin\", \"age\":1}")
jsonPath3 <- tempfile(pattern="sparkr-test", fileext=".tmp")
writeLines(mockLines3, jsonPath3)
- df8 <- jsonFile(sqlContext, jsonPath3)
+ df8 <- read.json(sqlContext, jsonPath3)
gd3 <- groupBy(df8, "name")
gd3_local <- collect(sum(gd3))
expect_equal(60, gd3_local[gd3_local$name == "Andy",][1, 2])
@@ -1181,7 +1193,7 @@ test_that("group by, agg functions", {
})
test_that("arrange() and orderBy() on a DataFrame", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
sorted <- arrange(df, df$age)
expect_equal(collect(sorted)[1,2], "Michael")
@@ -1207,7 +1219,7 @@ test_that("arrange() and orderBy() on a DataFrame", {
})
test_that("filter() on a DataFrame", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
filtered <- filter(df, "age > 20")
expect_equal(count(filtered), 1)
expect_equal(collect(filtered)$name, "Andy")
@@ -1230,7 +1242,7 @@ test_that("filter() on a DataFrame", {
})
test_that("join() and merge() on a DataFrame", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
mockLines2 <- c("{\"name\":\"Michael\", \"test\": \"yes\"}",
"{\"name\":\"Andy\", \"test\": \"no\"}",
@@ -1238,7 +1250,7 @@ test_that("join() and merge() on a DataFrame", {
"{\"name\":\"Bob\", \"test\": \"yes\"}")
jsonPath2 <- tempfile(pattern="sparkr-test", fileext=".tmp")
writeLines(mockLines2, jsonPath2)
- df2 <- jsonFile(sqlContext, jsonPath2)
+ df2 <- read.json(sqlContext, jsonPath2)
joined <- join(df, df2)
expect_equal(names(joined), c("age", "name", "name", "test"))
@@ -1313,14 +1325,14 @@ test_that("join() and merge() on a DataFrame", {
"{\"name\":\"Bob\", \"name_y\":\"Bob\", \"test\": \"yes\"}")
jsonPath3 <- tempfile(pattern="sparkr-test", fileext=".tmp")
writeLines(mockLines3, jsonPath3)
- df3 <- jsonFile(sqlContext, jsonPath3)
+ df3 <- read.json(sqlContext, jsonPath3)
expect_error(merge(df, df3),
paste("The following column name: name_y occurs more than once in the 'DataFrame'.",
"Please use different suffixes for the intersected columns.", sep = ""))
})
test_that("toJSON() returns an RDD of the correct values", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
testRDD <- toJSON(df)
expect_is(testRDD, "RDD")
expect_equal(getSerializedMode(testRDD), "string")
@@ -1328,7 +1340,7 @@ test_that("toJSON() returns an RDD of the correct values", {
})
test_that("showDF()", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
s <- capture.output(showDF(df))
expected <- paste("+----+-------+\n",
"| age| name|\n",
@@ -1341,12 +1353,12 @@ test_that("showDF()", {
})
test_that("isLocal()", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
expect_false(isLocal(df))
})
test_that("unionAll(), rbind(), except(), and intersect() on a DataFrame", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
lines <- c("{\"name\":\"Bob\", \"age\":24}",
"{\"name\":\"Andy\", \"age\":30}",
@@ -1383,7 +1395,7 @@ test_that("unionAll(), rbind(), except(), and intersect() on a DataFrame", {
})
test_that("withColumn() and withColumnRenamed()", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
newDF <- withColumn(df, "newAge", df$age + 2)
expect_equal(length(columns(newDF)), 3)
expect_equal(columns(newDF)[3], "newAge")
@@ -1395,7 +1407,7 @@ test_that("withColumn() and withColumnRenamed()", {
})
test_that("mutate(), transform(), rename() and names()", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
newDF <- mutate(df, newAge = df$age + 2)
expect_equal(length(columns(newDF)), 3)
expect_equal(columns(newDF)[3], "newAge")
@@ -1425,7 +1437,7 @@ test_that("mutate(), transform(), rename() and names()", {
})
test_that("write.df() on DataFrame and works with read.parquet", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
write.df(df, parquetPath, "parquet", mode="overwrite")
parquetDF <- read.parquet(sqlContext, parquetPath)
expect_is(parquetDF, "DataFrame")
@@ -1433,7 +1445,7 @@ test_that("write.df() on DataFrame and works with read.parquet", {
})
test_that("read.parquet()/parquetFile() works with multiple input paths", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
write.df(df, parquetPath, "parquet", mode="overwrite")
parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
write.df(df, parquetPath2, "parquet", mode="overwrite")
@@ -1452,7 +1464,7 @@ test_that("read.parquet()/parquetFile() works with multiple input paths", {
})
test_that("describe() and summarize() on a DataFrame", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
stats <- describe(df, "age")
expect_equal(collect(stats)[1, "summary"], "count")
expect_equal(collect(stats)[2, "age"], "24.5")
@@ -1470,7 +1482,7 @@ test_that("describe() and summarize() on a DataFrame", {
})
test_that("dropna() and na.omit() on a DataFrame", {
- df <- jsonFile(sqlContext, jsonPathNa)
+ df <- read.json(sqlContext, jsonPathNa)
rows <- collect(df)
# drop with columns
@@ -1556,7 +1568,7 @@ test_that("dropna() and na.omit() on a DataFrame", {
})
test_that("fillna() on a DataFrame", {
- df <- jsonFile(sqlContext, jsonPathNa)
+ df <- read.json(sqlContext, jsonPathNa)
rows <- collect(df)
# fill with value
@@ -1665,7 +1677,7 @@ test_that("Method as.data.frame as a synonym for collect()", {
})
test_that("attach() on a DataFrame", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
expect_error(age)
attach(df)
expect_is(age, "DataFrame")
@@ -1713,7 +1725,7 @@ test_that("Method coltypes() to get and set R's data types of a DataFrame", {
list("a"="b", "c"="d", "e"="f")))))
expect_equal(coltypes(x), "map<string,string>")
- df <- selectExpr(jsonFile(sqlContext, jsonPath), "name", "(age * 1.21) as age")
+ df <- selectExpr(read.json(sqlContext, jsonPath), "name", "(age * 1.21) as age")
expect_equal(dtypes(df), list(c("name", "string"), c("age", "decimal(24,2)")))
df1 <- select(df, cast(df$age, "integer"))