aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYanbo Liang <ybliang8@gmail.com>2015-12-11 11:47:35 -0800
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2015-12-11 11:47:35 -0800
commit0fb9825556dbbcc98d7eafe9ddea8676301e09bb (patch)
treef89813a20b893dbf65c019e4a031a7d3e189c135
parentc119a34d1e9e599e302acfda92e5de681086a19f (diff)
downloadspark-0fb9825556dbbcc98d7eafe9ddea8676301e09bb.tar.gz
spark-0fb9825556dbbcc98d7eafe9ddea8676301e09bb.tar.bz2
spark-0fb9825556dbbcc98d7eafe9ddea8676301e09bb.zip
[SPARK-12146][SPARKR] SparkR jsonFile should support multiple input files
* ```jsonFile``` should support multiple input files, such as: ```R jsonFile(sqlContext, c(“path1”, “path2”)) # character vector as arguments jsonFile(sqlContext, “path1,path2”) ``` * Meanwhile, ```jsonFile``` has been deprecated by Spark SQL and will be removed at Spark 2.0. So we mark ```jsonFile``` deprecated and use ```read.json``` at SparkR side. * Replace all ```jsonFile``` with ```read.json``` at test_sparkSQL.R, but still keep jsonFile test case. * If this PR is accepted, we should also make almost the same change for ```parquetFile```. cc felixcheung sun-rui shivaram Author: Yanbo Liang <ybliang8@gmail.com> Closes #10145 from yanboliang/spark-12146.
-rw-r--r--R/pkg/NAMESPACE1
-rw-r--r--R/pkg/R/DataFrame.R102
-rw-r--r--R/pkg/R/SQLContext.R29
-rw-r--r--R/pkg/inst/tests/testthat/test_sparkSQL.R120
-rw-r--r--examples/src/main/r/dataframe.R2
5 files changed, 138 insertions, 116 deletions
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index ba64bc59ed..cab39d68c3 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -267,6 +267,7 @@ export("as.DataFrame",
"createExternalTable",
"dropTempTable",
"jsonFile",
+ "read.json",
"loadDF",
"parquetFile",
"read.df",
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index f4c4a2585e..975b058c0a 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -24,14 +24,14 @@ setOldClass("jobj")
#' @title S4 class that represents a DataFrame
#' @description DataFrames can be created using functions like \link{createDataFrame},
-#' \link{jsonFile}, \link{table} etc.
+#' \link{read.json}, \link{table} etc.
#' @family DataFrame functions
#' @rdname DataFrame
#' @docType class
#'
#' @slot env An R environment that stores bookkeeping states of the DataFrame
#' @slot sdf A Java object reference to the backing Scala DataFrame
-#' @seealso \link{createDataFrame}, \link{jsonFile}, \link{table}
+#' @seealso \link{createDataFrame}, \link{read.json}, \link{table}
#' @seealso \url{https://spark.apache.org/docs/latest/sparkr.html#sparkr-dataframes}
#' @export
#' @examples
@@ -77,7 +77,7 @@ dataFrame <- function(sdf, isCached = FALSE) {
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' printSchema(df)
#'}
setMethod("printSchema",
@@ -102,7 +102,7 @@ setMethod("printSchema",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' dfSchema <- schema(df)
#'}
setMethod("schema",
@@ -126,7 +126,7 @@ setMethod("schema",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' explain(df, TRUE)
#'}
setMethod("explain",
@@ -157,7 +157,7 @@ setMethod("explain",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' isLocal(df)
#'}
setMethod("isLocal",
@@ -182,7 +182,7 @@ setMethod("isLocal",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' showDF(df)
#'}
setMethod("showDF",
@@ -207,7 +207,7 @@ setMethod("showDF",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' df
#'}
setMethod("show", "DataFrame",
@@ -234,7 +234,7 @@ setMethod("show", "DataFrame",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' dtypes(df)
#'}
setMethod("dtypes",
@@ -261,7 +261,7 @@ setMethod("dtypes",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' columns(df)
#' colnames(df)
#'}
@@ -376,7 +376,7 @@ setMethod("coltypes",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' coltypes(df) <- c("character", "integer")
#' coltypes(df) <- c(NA, "numeric")
#'}
@@ -423,7 +423,7 @@ setMethod("coltypes<-",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' registerTempTable(df, "json_df")
#' new_df <- sql(sqlContext, "SELECT * FROM json_df")
#'}
@@ -476,7 +476,7 @@ setMethod("insertInto",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' cache(df)
#'}
setMethod("cache",
@@ -504,7 +504,7 @@ setMethod("cache",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' persist(df, "MEMORY_AND_DISK")
#'}
setMethod("persist",
@@ -532,7 +532,7 @@ setMethod("persist",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' persist(df, "MEMORY_AND_DISK")
#' unpersist(df)
#'}
@@ -560,7 +560,7 @@ setMethod("unpersist",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' newDF <- repartition(df, 2L)
#'}
setMethod("repartition",
@@ -585,7 +585,7 @@ setMethod("repartition",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' newRDD <- toJSON(df)
#'}
setMethod("toJSON",
@@ -613,7 +613,7 @@ setMethod("toJSON",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' saveAsParquetFile(df, "/tmp/sparkr-tmp/")
#'}
setMethod("saveAsParquetFile",
@@ -637,7 +637,7 @@ setMethod("saveAsParquetFile",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' distinctDF <- distinct(df)
#'}
setMethod("distinct",
@@ -672,7 +672,7 @@ setMethod("unique",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' collect(sample(df, FALSE, 0.5))
#' collect(sample(df, TRUE, 0.5))
#'}
@@ -711,7 +711,7 @@ setMethod("sample_frac",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' count(df)
#' }
setMethod("count",
@@ -741,7 +741,7 @@ setMethod("nrow",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' ncol(df)
#' }
setMethod("ncol",
@@ -762,7 +762,7 @@ setMethod("ncol",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' dim(df)
#' }
setMethod("dim",
@@ -786,7 +786,7 @@ setMethod("dim",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' collected <- collect(df)
#' firstName <- collected[[1]]$name
#' }
@@ -858,7 +858,7 @@ setMethod("collect",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' limitedDF <- limit(df, 10)
#' }
setMethod("limit",
@@ -879,7 +879,7 @@ setMethod("limit",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' take(df, 2)
#' }
setMethod("take",
@@ -908,7 +908,7 @@ setMethod("take",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' head(df)
#' }
setMethod("head",
@@ -931,7 +931,7 @@ setMethod("head",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' first(df)
#' }
setMethod("first",
@@ -952,7 +952,7 @@ setMethod("first",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' rdd <- toRDD(df)
#'}
setMethod("toRDD",
@@ -1298,7 +1298,7 @@ setMethod("select",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' selectExpr(df, "col1", "(col2 * 5) as newCol")
#' }
setMethod("selectExpr",
@@ -1327,7 +1327,7 @@ setMethod("selectExpr",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' newDF <- withColumn(df, "newCol", df$col1 * 5)
#' }
setMethod("withColumn",
@@ -1352,7 +1352,7 @@ setMethod("withColumn",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' newDF <- mutate(df, newCol = df$col1 * 5, newCol2 = df$col1 * 2)
#' names(newDF) # Will contain newCol, newCol2
#' newDF2 <- transform(df, newCol = df$col1 / 5, newCol2 = df$col1 * 2)
@@ -1402,7 +1402,7 @@ setMethod("transform",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' newDF <- withColumnRenamed(df, "col1", "newCol1")
#' }
setMethod("withColumnRenamed",
@@ -1427,7 +1427,7 @@ setMethod("withColumnRenamed",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' newDF <- rename(df, col1 = df$newCol1)
#' }
setMethod("rename",
@@ -1471,7 +1471,7 @@ setClassUnion("characterOrColumn", c("character", "Column"))
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' arrange(df, df$col1)
#' arrange(df, asc(df$col1), desc(abs(df$col2)))
#' arrange(df, "col1", decreasing = TRUE)
@@ -1547,7 +1547,7 @@ setMethod("orderBy",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' filter(df, "col1 > 0")
#' filter(df, df$col2 != "abcdefg")
#' }
@@ -1591,8 +1591,8 @@ setMethod("where",
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
-#' df1 <- jsonFile(sqlContext, path)
-#' df2 <- jsonFile(sqlContext, path2)
+#' df1 <- read.json(sqlContext, path)
+#' df2 <- read.json(sqlContext, path2)
#' join(df1, df2) # Performs a Cartesian
#' join(df1, df2, df1$col1 == df2$col2) # Performs an inner join based on expression
#' join(df1, df2, df1$col1 == df2$col2, "right_outer")
@@ -1648,8 +1648,8 @@ setMethod("join",
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
-#' df1 <- jsonFile(sqlContext, path)
-#' df2 <- jsonFile(sqlContext, path2)
+#' df1 <- read.json(sqlContext, path)
+#' df2 <- read.json(sqlContext, path2)
#' merge(df1, df2) # Performs a Cartesian
#' merge(df1, df2, by = "col1") # Performs an inner join based on expression
#' merge(df1, df2, by.x = "col1", by.y = "col2", all.y = TRUE)
@@ -1781,8 +1781,8 @@ generateAliasesForIntersectedCols <- function (x, intersectedColNames, suffix) {
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
-#' df1 <- jsonFile(sqlContext, path)
-#' df2 <- jsonFile(sqlContext, path2)
+#' df1 <- read.json(sqlContext, path)
+#' df2 <- read.json(sqlContext, path2)
#' unioned <- unionAll(df, df2)
#' }
setMethod("unionAll",
@@ -1824,8 +1824,8 @@ setMethod("rbind",
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
-#' df1 <- jsonFile(sqlContext, path)
-#' df2 <- jsonFile(sqlContext, path2)
+#' df1 <- read.json(sqlContext, path)
+#' df2 <- read.json(sqlContext, path2)
#' intersectDF <- intersect(df, df2)
#' }
setMethod("intersect",
@@ -1851,8 +1851,8 @@ setMethod("intersect",
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
-#' df1 <- jsonFile(sqlContext, path)
-#' df2 <- jsonFile(sqlContext, path2)
+#' df1 <- read.json(sqlContext, path)
+#' df2 <- read.json(sqlContext, path2)
#' exceptDF <- except(df, df2)
#' }
#' @rdname except
@@ -1892,7 +1892,7 @@ setMethod("except",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' write.df(df, "myfile", "parquet", "overwrite")
#' saveDF(df, parquetPath2, "parquet", mode = saveMode, mergeSchema = mergeSchema)
#' }
@@ -1957,7 +1957,7 @@ setMethod("saveDF",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' saveAsTable(df, "myfile")
#' }
setMethod("saveAsTable",
@@ -1998,7 +1998,7 @@ setMethod("saveAsTable",
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' describe(df)
#' describe(df, "col1")
#' describe(df, "col1", "col2")
@@ -2054,7 +2054,7 @@ setMethod("summary",
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- read.json(sqlCtx, path)
#' dropna(df)
#' }
setMethod("dropna",
@@ -2108,7 +2108,7 @@ setMethod("na.omit",
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlCtx, path)
+#' df <- read.json(sqlCtx, path)
#' fillna(df, 1)
#' fillna(df, list("age" = 20, "name" = "unknown"))
#' }
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index f678c70a7a..9243d70e66 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -208,24 +208,33 @@ setMethod("toDF", signature(x = "RDD"),
#' @param sqlContext SQLContext to use
#' @param path Path of file to read. A vector of multiple paths is allowed.
#' @return DataFrame
+#' @rdname read.json
+#' @name read.json
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
+#' df <- read.json(sqlContext, path)
#' df <- jsonFile(sqlContext, path)
#' }
-
-jsonFile <- function(sqlContext, path) {
+read.json <- function(sqlContext, path) {
# Allow the user to have a more flexible definiton of the text file path
- path <- suppressWarnings(normalizePath(path))
- # Convert a string vector of paths to a string containing comma separated paths
- path <- paste(path, collapse = ",")
- sdf <- callJMethod(sqlContext, "jsonFile", path)
+ paths <- as.list(suppressWarnings(normalizePath(path)))
+ read <- callJMethod(sqlContext, "read")
+ sdf <- callJMethod(read, "json", paths)
dataFrame(sdf)
}
+#' @rdname read.json
+#' @name jsonFile
+#' @export
+jsonFile <- function(sqlContext, path) {
+ .Deprecated("read.json")
+ read.json(sqlContext, path)
+}
+
#' JSON RDD
#'
@@ -299,7 +308,7 @@ parquetFile <- function(sqlContext, ...) {
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' registerTempTable(df, "table")
#' new_df <- sql(sqlContext, "SELECT * FROM table")
#' }
@@ -323,7 +332,7 @@ sql <- function(sqlContext, sqlQuery) {
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' registerTempTable(df, "table")
#' new_df <- table(sqlContext, "table")
#' }
@@ -396,7 +405,7 @@ tableNames <- function(sqlContext, databaseName = NULL) {
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' registerTempTable(df, "table")
#' cacheTable(sqlContext, "table")
#' }
@@ -418,7 +427,7 @@ cacheTable <- function(sqlContext, tableName) {
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
-#' df <- jsonFile(sqlContext, path)
+#' df <- read.json(sqlContext, path)
#' registerTempTable(df, "table")
#' uncacheTable(sqlContext, "table")
#' }
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 2051784427..ed9b2c9d4d 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -330,7 +330,7 @@ writeLines(mockLinesMapType, mapTypeJsonPath)
test_that("Collect DataFrame with complex types", {
# ArrayType
- df <- jsonFile(sqlContext, complexTypeJsonPath)
+ df <- read.json(sqlContext, complexTypeJsonPath)
ldf <- collect(df)
expect_equal(nrow(ldf), 3)
@@ -357,7 +357,7 @@ test_that("Collect DataFrame with complex types", {
expect_equal(bob$height, 176.5)
# StructType
- df <- jsonFile(sqlContext, mapTypeJsonPath)
+ df <- read.json(sqlContext, mapTypeJsonPath)
expect_equal(dtypes(df), list(c("info", "struct<age:bigint,height:double>"),
c("name", "string")))
ldf <- collect(df)
@@ -371,10 +371,22 @@ test_that("Collect DataFrame with complex types", {
expect_equal(bob$height, 176.5)
})
-test_that("jsonFile() on a local file returns a DataFrame", {
- df <- jsonFile(sqlContext, jsonPath)
+test_that("read.json()/jsonFile() on a local file returns a DataFrame", {
+ df <- read.json(sqlContext, jsonPath)
expect_is(df, "DataFrame")
expect_equal(count(df), 3)
+ # read.json()/jsonFile() works with multiple input paths
+ jsonPath2 <- tempfile(pattern="jsonPath2", fileext=".json")
+ write.df(df, jsonPath2, "json", mode="overwrite")
+ jsonDF1 <- read.json(sqlContext, c(jsonPath, jsonPath2))
+ expect_is(jsonDF1, "DataFrame")
+ expect_equal(count(jsonDF1), 6)
+ # Suppress warnings because jsonFile is deprecated
+ jsonDF2 <- suppressWarnings(jsonFile(sqlContext, c(jsonPath, jsonPath2)))
+ expect_is(jsonDF2, "DataFrame")
+ expect_equal(count(jsonDF2), 6)
+
+ unlink(jsonPath2)
})
test_that("jsonRDD() on a RDD with json string", {
@@ -391,7 +403,7 @@ test_that("jsonRDD() on a RDD with json string", {
})
test_that("test cache, uncache and clearCache", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
registerTempTable(df, "table1")
cacheTable(sqlContext, "table1")
uncacheTable(sqlContext, "table1")
@@ -400,7 +412,7 @@ test_that("test cache, uncache and clearCache", {
})
test_that("test tableNames and tables", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
registerTempTable(df, "table1")
expect_equal(length(tableNames(sqlContext)), 1)
df <- tables(sqlContext)
@@ -409,7 +421,7 @@ test_that("test tableNames and tables", {
})
test_that("registerTempTable() results in a queryable table and sql() results in a new DataFrame", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
registerTempTable(df, "table1")
newdf <- sql(sqlContext, "SELECT * FROM table1 where name = 'Michael'")
expect_is(newdf, "DataFrame")
@@ -445,7 +457,7 @@ test_that("insertInto() on a registered table", {
})
test_that("table() returns a new DataFrame", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
registerTempTable(df, "table1")
tabledf <- table(sqlContext, "table1")
expect_is(tabledf, "DataFrame")
@@ -458,14 +470,14 @@ test_that("table() returns a new DataFrame", {
})
test_that("toRDD() returns an RRDD", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
testRDD <- toRDD(df)
expect_is(testRDD, "RDD")
expect_equal(count(testRDD), 3)
})
test_that("union on two RDDs created from DataFrames returns an RRDD", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
RDD1 <- toRDD(df)
RDD2 <- toRDD(df)
unioned <- unionRDD(RDD1, RDD2)
@@ -487,7 +499,7 @@ test_that("union on mixed serialization types correctly returns a byte RRDD", {
writeLines(textLines, textPath)
textRDD <- textFile(sc, textPath)
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
dfRDD <- toRDD(df)
unionByte <- unionRDD(rdd, dfRDD)
@@ -505,7 +517,7 @@ test_that("union on mixed serialization types correctly returns a byte RRDD", {
test_that("objectFile() works with row serialization", {
objectPath <- tempfile(pattern="spark-test", fileext=".tmp")
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
dfRDD <- toRDD(df)
saveAsObjectFile(coalesce(dfRDD, 1L), objectPath)
objectIn <- objectFile(sc, objectPath)
@@ -516,7 +528,7 @@ test_that("objectFile() works with row serialization", {
})
test_that("lapply() on a DataFrame returns an RDD with the correct columns", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
testRDD <- lapply(df, function(row) {
row$newCol <- row$age + 5
row
@@ -528,7 +540,7 @@ test_that("lapply() on a DataFrame returns an RDD with the correct columns", {
})
test_that("collect() returns a data.frame", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
rdf <- collect(df)
expect_true(is.data.frame(rdf))
expect_equal(names(rdf)[1], "age")
@@ -550,14 +562,14 @@ test_that("collect() returns a data.frame", {
})
test_that("limit() returns DataFrame with the correct number of rows", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
dfLimited <- limit(df, 2)
expect_is(dfLimited, "DataFrame")
expect_equal(count(dfLimited), 2)
})
test_that("collect() and take() on a DataFrame return the same number of rows and columns", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
expect_equal(nrow(collect(df)), nrow(take(df, 10)))
expect_equal(ncol(collect(df)), ncol(take(df, 10)))
})
@@ -584,7 +596,7 @@ test_that("collect() support Unicode characters", {
})
test_that("multiple pipeline transformations result in an RDD with the correct values", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
first <- lapply(df, function(row) {
row$age <- row$age + 5
row
@@ -601,7 +613,7 @@ test_that("multiple pipeline transformations result in an RDD with the correct v
})
test_that("cache(), persist(), and unpersist() on a DataFrame", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
expect_false(df@env$isCached)
cache(df)
expect_true(df@env$isCached)
@@ -620,7 +632,7 @@ test_that("cache(), persist(), and unpersist() on a DataFrame", {
})
test_that("schema(), dtypes(), columns(), names() return the correct values/format", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
testSchema <- schema(df)
expect_equal(length(testSchema$fields()), 2)
expect_equal(testSchema$fields()[[1]]$dataType.toString(), "LongType")
@@ -641,7 +653,7 @@ test_that("schema(), dtypes(), columns(), names() return the correct values/form
})
test_that("names() colnames() set the column names", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
names(df) <- c("col1", "col2")
expect_equal(colnames(df)[2], "col2")
@@ -661,7 +673,7 @@ test_that("names() colnames() set the column names", {
})
test_that("head() and first() return the correct data", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
testHead <- head(df)
expect_equal(nrow(testHead), 3)
expect_equal(ncol(testHead), 2)
@@ -694,7 +706,7 @@ test_that("distinct() and unique on DataFrames", {
jsonPathWithDup <- tempfile(pattern="sparkr-test", fileext=".tmp")
writeLines(lines, jsonPathWithDup)
- df <- jsonFile(sqlContext, jsonPathWithDup)
+ df <- read.json(sqlContext, jsonPathWithDup)
uniques <- distinct(df)
expect_is(uniques, "DataFrame")
expect_equal(count(uniques), 3)
@@ -705,7 +717,7 @@ test_that("distinct() and unique on DataFrames", {
})
test_that("sample on a DataFrame", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
sampled <- sample(df, FALSE, 1.0)
expect_equal(nrow(collect(sampled)), count(df))
expect_is(sampled, "DataFrame")
@@ -721,7 +733,7 @@ test_that("sample on a DataFrame", {
})
test_that("select operators", {
- df <- select(jsonFile(sqlContext, jsonPath), "name", "age")
+ df <- select(read.json(sqlContext, jsonPath), "name", "age")
expect_is(df$name, "Column")
expect_is(df[[2]], "Column")
expect_is(df[["age"]], "Column")
@@ -747,7 +759,7 @@ test_that("select operators", {
})
test_that("select with column", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
df1 <- select(df, "name")
expect_equal(columns(df1), c("name"))
expect_equal(count(df1), 3)
@@ -770,8 +782,8 @@ test_that("select with column", {
})
test_that("subsetting", {
- # jsonFile returns columns in random order
- df <- select(jsonFile(sqlContext, jsonPath), "name", "age")
+ # read.json returns columns in random order
+ df <- select(read.json(sqlContext, jsonPath), "name", "age")
filtered <- df[df$age > 20,]
expect_equal(count(filtered), 1)
expect_equal(columns(filtered), c("name", "age"))
@@ -808,7 +820,7 @@ test_that("subsetting", {
})
test_that("selectExpr() on a DataFrame", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
selected <- selectExpr(df, "age * 2")
expect_equal(names(selected), "(age * 2)")
expect_equal(collect(selected), collect(select(df, df$age * 2L)))
@@ -819,12 +831,12 @@ test_that("selectExpr() on a DataFrame", {
})
test_that("expr() on a DataFrame", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
expect_equal(collect(select(df, expr("abs(-123)")))[1, 1], 123)
})
test_that("column calculation", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
d <- collect(select(df, alias(df$age + 1, "age2")))
expect_equal(names(d), c("age2"))
df2 <- select(df, lower(df$name), abs(df$age))
@@ -915,7 +927,7 @@ test_that("column functions", {
expect_equal(class(rank())[[1]], "Column")
expect_equal(rank(1:3), as.numeric(c(1:3)))
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
df2 <- select(df, between(df$age, c(20, 30)), between(df$age, c(10, 20)))
expect_equal(collect(df2)[[2, 1]], TRUE)
expect_equal(collect(df2)[[2, 2]], FALSE)
@@ -983,7 +995,7 @@ test_that("column binary mathfunctions", {
"{\"a\":4, \"b\":8}")
jsonPathWithDup <- tempfile(pattern="sparkr-test", fileext=".tmp")
writeLines(lines, jsonPathWithDup)
- df <- jsonFile(sqlContext, jsonPathWithDup)
+ df <- read.json(sqlContext, jsonPathWithDup)
expect_equal(collect(select(df, atan2(df$a, df$b)))[1, "ATAN2(a, b)"], atan2(1, 5))
expect_equal(collect(select(df, atan2(df$a, df$b)))[2, "ATAN2(a, b)"], atan2(2, 6))
expect_equal(collect(select(df, atan2(df$a, df$b)))[3, "ATAN2(a, b)"], atan2(3, 7))
@@ -1004,7 +1016,7 @@ test_that("column binary mathfunctions", {
})
test_that("string operators", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
expect_equal(count(where(df, like(df$name, "A%"))), 1)
expect_equal(count(where(df, startsWith(df$name, "A"))), 1)
expect_equal(first(select(df, substr(df$name, 1, 2)))[[1]], "Mi")
@@ -1100,7 +1112,7 @@ test_that("when(), otherwise() and ifelse() on a DataFrame", {
})
test_that("group by, agg functions", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
df1 <- agg(df, name = "max", age = "sum")
expect_equal(1, count(df1))
df1 <- agg(df, age2 = max(df$age))
@@ -1145,7 +1157,7 @@ test_that("group by, agg functions", {
"{\"name\":\"ID2\", \"value\": \"-3\"}")
jsonPath2 <- tempfile(pattern="sparkr-test", fileext=".tmp")
writeLines(mockLines2, jsonPath2)
- gd2 <- groupBy(jsonFile(sqlContext, jsonPath2), "name")
+ gd2 <- groupBy(read.json(sqlContext, jsonPath2), "name")
df6 <- agg(gd2, value = "sum")
df6_local <- collect(df6)
expect_equal(42, df6_local[df6_local$name == "ID1",][1, 2])
@@ -1162,7 +1174,7 @@ test_that("group by, agg functions", {
"{\"name\":\"Justin\", \"age\":1}")
jsonPath3 <- tempfile(pattern="sparkr-test", fileext=".tmp")
writeLines(mockLines3, jsonPath3)
- df8 <- jsonFile(sqlContext, jsonPath3)
+ df8 <- read.json(sqlContext, jsonPath3)
gd3 <- groupBy(df8, "name")
gd3_local <- collect(sum(gd3))
expect_equal(60, gd3_local[gd3_local$name == "Andy",][1, 2])
@@ -1181,7 +1193,7 @@ test_that("group by, agg functions", {
})
test_that("arrange() and orderBy() on a DataFrame", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
sorted <- arrange(df, df$age)
expect_equal(collect(sorted)[1,2], "Michael")
@@ -1207,7 +1219,7 @@ test_that("arrange() and orderBy() on a DataFrame", {
})
test_that("filter() on a DataFrame", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
filtered <- filter(df, "age > 20")
expect_equal(count(filtered), 1)
expect_equal(collect(filtered)$name, "Andy")
@@ -1230,7 +1242,7 @@ test_that("filter() on a DataFrame", {
})
test_that("join() and merge() on a DataFrame", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
mockLines2 <- c("{\"name\":\"Michael\", \"test\": \"yes\"}",
"{\"name\":\"Andy\", \"test\": \"no\"}",
@@ -1238,7 +1250,7 @@ test_that("join() and merge() on a DataFrame", {
"{\"name\":\"Bob\", \"test\": \"yes\"}")
jsonPath2 <- tempfile(pattern="sparkr-test", fileext=".tmp")
writeLines(mockLines2, jsonPath2)
- df2 <- jsonFile(sqlContext, jsonPath2)
+ df2 <- read.json(sqlContext, jsonPath2)
joined <- join(df, df2)
expect_equal(names(joined), c("age", "name", "name", "test"))
@@ -1313,14 +1325,14 @@ test_that("join() and merge() on a DataFrame", {
"{\"name\":\"Bob\", \"name_y\":\"Bob\", \"test\": \"yes\"}")
jsonPath3 <- tempfile(pattern="sparkr-test", fileext=".tmp")
writeLines(mockLines3, jsonPath3)
- df3 <- jsonFile(sqlContext, jsonPath3)
+ df3 <- read.json(sqlContext, jsonPath3)
expect_error(merge(df, df3),
paste("The following column name: name_y occurs more than once in the 'DataFrame'.",
"Please use different suffixes for the intersected columns.", sep = ""))
})
test_that("toJSON() returns an RDD of the correct values", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
testRDD <- toJSON(df)
expect_is(testRDD, "RDD")
expect_equal(getSerializedMode(testRDD), "string")
@@ -1328,7 +1340,7 @@ test_that("toJSON() returns an RDD of the correct values", {
})
test_that("showDF()", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
s <- capture.output(showDF(df))
expected <- paste("+----+-------+\n",
"| age| name|\n",
@@ -1341,12 +1353,12 @@ test_that("showDF()", {
})
test_that("isLocal()", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
expect_false(isLocal(df))
})
test_that("unionAll(), rbind(), except(), and intersect() on a DataFrame", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
lines <- c("{\"name\":\"Bob\", \"age\":24}",
"{\"name\":\"Andy\", \"age\":30}",
@@ -1383,7 +1395,7 @@ test_that("unionAll(), rbind(), except(), and intersect() on a DataFrame", {
})
test_that("withColumn() and withColumnRenamed()", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
newDF <- withColumn(df, "newAge", df$age + 2)
expect_equal(length(columns(newDF)), 3)
expect_equal(columns(newDF)[3], "newAge")
@@ -1395,7 +1407,7 @@ test_that("withColumn() and withColumnRenamed()", {
})
test_that("mutate(), transform(), rename() and names()", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
newDF <- mutate(df, newAge = df$age + 2)
expect_equal(length(columns(newDF)), 3)
expect_equal(columns(newDF)[3], "newAge")
@@ -1425,7 +1437,7 @@ test_that("mutate(), transform(), rename() and names()", {
})
test_that("write.df() on DataFrame and works with read.parquet", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
write.df(df, parquetPath, "parquet", mode="overwrite")
parquetDF <- read.parquet(sqlContext, parquetPath)
expect_is(parquetDF, "DataFrame")
@@ -1433,7 +1445,7 @@ test_that("write.df() on DataFrame and works with read.parquet", {
})
test_that("read.parquet()/parquetFile() works with multiple input paths", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
write.df(df, parquetPath, "parquet", mode="overwrite")
parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
write.df(df, parquetPath2, "parquet", mode="overwrite")
@@ -1452,7 +1464,7 @@ test_that("read.parquet()/parquetFile() works with multiple input paths", {
})
test_that("describe() and summarize() on a DataFrame", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
stats <- describe(df, "age")
expect_equal(collect(stats)[1, "summary"], "count")
expect_equal(collect(stats)[2, "age"], "24.5")
@@ -1470,7 +1482,7 @@ test_that("describe() and summarize() on a DataFrame", {
})
test_that("dropna() and na.omit() on a DataFrame", {
- df <- jsonFile(sqlContext, jsonPathNa)
+ df <- read.json(sqlContext, jsonPathNa)
rows <- collect(df)
# drop with columns
@@ -1556,7 +1568,7 @@ test_that("dropna() and na.omit() on a DataFrame", {
})
test_that("fillna() on a DataFrame", {
- df <- jsonFile(sqlContext, jsonPathNa)
+ df <- read.json(sqlContext, jsonPathNa)
rows <- collect(df)
# fill with value
@@ -1665,7 +1677,7 @@ test_that("Method as.data.frame as a synonym for collect()", {
})
test_that("attach() on a DataFrame", {
- df <- jsonFile(sqlContext, jsonPath)
+ df <- read.json(sqlContext, jsonPath)
expect_error(age)
attach(df)
expect_is(age, "DataFrame")
@@ -1713,7 +1725,7 @@ test_that("Method coltypes() to get and set R's data types of a DataFrame", {
list("a"="b", "c"="d", "e"="f")))))
expect_equal(coltypes(x), "map<string,string>")
- df <- selectExpr(jsonFile(sqlContext, jsonPath), "name", "(age * 1.21) as age")
+ df <- selectExpr(read.json(sqlContext, jsonPath), "name", "(age * 1.21) as age")
expect_equal(dtypes(df), list(c("name", "string"), c("age", "decimal(24,2)")))
df1 <- select(df, cast(df$age, "integer"))
diff --git a/examples/src/main/r/dataframe.R b/examples/src/main/r/dataframe.R
index 53b817144f..62f60e57ee 100644
--- a/examples/src/main/r/dataframe.R
+++ b/examples/src/main/r/dataframe.R
@@ -35,7 +35,7 @@ printSchema(df)
# Create a DataFrame from a JSON file
path <- file.path(Sys.getenv("SPARK_HOME"), "examples/src/main/resources/people.json")
-peopleDF <- jsonFile(sqlContext, path)
+peopleDF <- read.json(sqlContext, path)
printSchema(peopleDF)
# Register this DataFrame as a table.