aboutsummaryrefslogtreecommitdiff
path: root/R
diff options
context:
space:
mode:
authorfelixcheung <felixcheung_m@hotmail.com>2016-01-04 22:32:07 -0800
committerReynold Xin <rxin@databricks.com>2016-01-04 22:32:07 -0800
commitcc4d5229c98a589da76a4d5e5fdc5ea92385183b (patch)
tree450ee4286632f96c991d0cfd9b67f1509529c68f /R
parentb634901bb28070ac5d9a24a9bc7b7640472a54e2 (diff)
downloadspark-cc4d5229c98a589da76a4d5e5fdc5ea92385183b.tar.gz
spark-cc4d5229c98a589da76a4d5e5fdc5ea92385183b.tar.bz2
spark-cc4d5229c98a589da76a4d5e5fdc5ea92385183b.zip
[SPARK-12625][SPARKR][SQL] replace R usage of Spark SQL deprecated API
rxin davies shivaram Took save mode from my PR #10480, and move everything to writer methods. This is related to PR #10559 - [x] it seems jsonRDD() is broken, need to investigate - this is not a public API though; will look into some more tonight. (fixed) Author: felixcheung <felixcheung_m@hotmail.com> Closes #10584 from felixcheung/rremovedeprecated.
Diffstat (limited to 'R')
-rw-r--r--R/pkg/R/DataFrame.R33
-rw-r--r--R/pkg/R/SQLContext.R10
-rw-r--r--R/pkg/R/column.R2
-rw-r--r--R/pkg/R/utils.R9
-rw-r--r--R/pkg/inst/tests/testthat/test_sparkSQL.R4
5 files changed, 33 insertions, 25 deletions
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 0cfa12b997..c126f9efb4 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -458,7 +458,10 @@ setMethod("registerTempTable",
setMethod("insertInto",
signature(x = "DataFrame", tableName = "character"),
function(x, tableName, overwrite = FALSE) {
- callJMethod(x@sdf, "insertInto", tableName, overwrite)
+ jmode <- convertToJSaveMode(ifelse(overwrite, "overwrite", "append"))
+ write <- callJMethod(x@sdf, "write")
+ write <- callJMethod(write, "mode", jmode)
+ callJMethod(write, "insertInto", tableName)
})
#' Cache
@@ -1948,18 +1951,15 @@ setMethod("write.df",
source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default",
"org.apache.spark.sql.parquet")
}
- allModes <- c("append", "overwrite", "error", "ignore")
- # nolint start
- if (!(mode %in% allModes)) {
- stop('mode should be one of "append", "overwrite", "error", "ignore"')
- }
- # nolint end
- jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
+ jmode <- convertToJSaveMode(mode)
options <- varargsToEnv(...)
if (!is.null(path)) {
options[["path"]] <- path
}
- callJMethod(df@sdf, "save", source, jmode, options)
+ write <- callJMethod(df@sdf, "write")
+ write <- callJMethod(write, "format", source)
+ write <- callJMethod(write, "mode", jmode)
+ write <- callJMethod(write, "save", path)
})
#' @rdname write.df
@@ -2013,15 +2013,14 @@ setMethod("saveAsTable",
source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default",
"org.apache.spark.sql.parquet")
}
- allModes <- c("append", "overwrite", "error", "ignore")
- # nolint start
- if (!(mode %in% allModes)) {
- stop('mode should be one of "append", "overwrite", "error", "ignore"')
- }
- # nolint end
- jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
+ jmode <- convertToJSaveMode(mode)
options <- varargsToEnv(...)
- callJMethod(df@sdf, "saveAsTable", tableName, source, jmode, options)
+
+ write <- callJMethod(df@sdf, "write")
+ write <- callJMethod(write, "format", source)
+ write <- callJMethod(write, "mode", jmode)
+ write <- callJMethod(write, "options", options)
+ callJMethod(write, "saveAsTable", tableName)
})
#' summary
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index 9243d70e66..ccc683d86a 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -256,9 +256,12 @@ jsonFile <- function(sqlContext, path) {
# TODO: support schema
jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) {
+ .Deprecated("read.json")
rdd <- serializeToString(rdd)
if (is.null(schema)) {
- sdf <- callJMethod(sqlContext, "jsonRDD", callJMethod(getJRDD(rdd), "rdd"), samplingRatio)
+ read <- callJMethod(sqlContext, "read")
+ # samplingRatio is deprecated
+ sdf <- callJMethod(read, "json", callJMethod(getJRDD(rdd), "rdd"))
dataFrame(sdf)
} else {
stop("not implemented")
@@ -289,10 +292,7 @@ read.parquet <- function(sqlContext, path) {
# TODO: Implement saveasParquetFile and write examples for both
parquetFile <- function(sqlContext, ...) {
.Deprecated("read.parquet")
- # Allow the user to have a more flexible definiton of the text file path
- paths <- lapply(list(...), function(x) suppressWarnings(normalizePath(x)))
- sdf <- callJMethod(sqlContext, "parquetFile", paths)
- dataFrame(sdf)
+ read.parquet(sqlContext, unlist(list(...)))
}
#' SQL Query
diff --git a/R/pkg/R/column.R b/R/pkg/R/column.R
index 356bcee3cf..3ffd9a9890 100644
--- a/R/pkg/R/column.R
+++ b/R/pkg/R/column.R
@@ -209,7 +209,7 @@ setMethod("cast",
setMethod("%in%",
signature(x = "Column"),
function(x, table) {
- jc <- callJMethod(x@jc, "in", as.list(table))
+ jc <- callJMethod(x@jc, "isin", as.list(table))
return(column(jc))
})
diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R
index 43105aaa38..aa386e5da9 100644
--- a/R/pkg/R/utils.R
+++ b/R/pkg/R/utils.R
@@ -641,3 +641,12 @@ assignNewEnv <- function(data) {
splitString <- function(input) {
Filter(nzchar, unlist(strsplit(input, ",|\\s")))
}
+
+convertToJSaveMode <- function(mode) {
+ allModes <- c("append", "overwrite", "error", "ignore")
+ if (!(mode %in% allModes)) {
+ stop('mode should be one of "append", "overwrite", "error", "ignore"') # nolint
+ }
+ jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
+ jmode
+}
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 9e5d0ebf60..ebe8faa34c 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -423,12 +423,12 @@ test_that("read/write json files", {
test_that("jsonRDD() on a RDD with json string", {
rdd <- parallelize(sc, mockLines)
expect_equal(count(rdd), 3)
- df <- jsonRDD(sqlContext, rdd)
+ df <- suppressWarnings(jsonRDD(sqlContext, rdd))
expect_is(df, "DataFrame")
expect_equal(count(df), 3)
rdd2 <- flatMap(rdd, function(x) c(x, x))
- df <- jsonRDD(sqlContext, rdd2)
+ df <- suppressWarnings(jsonRDD(sqlContext, rdd2))
expect_is(df, "DataFrame")
expect_equal(count(df), 6)
})