diff options
author | Narine Kokhlikyan <narine.kokhlikyan@gmail.com> | 2016-01-22 10:35:02 -0800 |
---|---|---|
committer | Shivaram Venkataraman <shivaram@cs.berkeley.edu> | 2016-01-22 10:35:02 -0800 |
commit | 8a88e121283472c26e70563a4e04c109e9b183b3 (patch) | |
tree | b2b1498caee3049c8315639b012a46155a0aa6fd /R/pkg | |
parent | e13c147e74a52d74e259f04e49e368fab64cdc1f (diff) | |
download | spark-8a88e121283472c26e70563a4e04c109e9b183b3.tar.gz spark-8a88e121283472c26e70563a4e04c109e9b183b3.tar.bz2 spark-8a88e121283472c26e70563a4e04c109e9b183b3.zip |
[SPARK-12629][SPARKR] Fixes for DataFrame saveAsTable method
I've tried to solve some of the issues mentioned in: https://issues.apache.org/jira/browse/SPARK-12629
Please, let me know what do you think.
Thanks!
Author: Narine Kokhlikyan <narine.kokhlikyan@gmail.com>
Closes #10580 from NarineK/sparkrSavaAsRable.
Diffstat (limited to 'R/pkg')
-rw-r--r-- | R/pkg/R/DataFrame.R | 23 | ||||
-rw-r--r-- | R/pkg/R/generics.R | 12 | ||||
-rw-r--r-- | R/pkg/inst/tests/testthat/test_sparkSQL.R | 15 |
3 files changed, 41 insertions, 9 deletions
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 4653a73e11..3b7b8250b9 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1997,7 +1997,13 @@ setMethod("write.df", signature(df = "DataFrame", path = "character"), function(df, path, source = NULL, mode = "error", ...){ if (is.null(source)) { - sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv) + if (exists(".sparkRSQLsc", envir = .sparkREnv)) { + sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv) + } else if (exists(".sparkRHivesc", envir = .sparkREnv)) { + sqlContext <- get(".sparkRHivesc", envir = .sparkREnv) + } else { + stop("sparkRHive or sparkRSQL context has to be specified") + } source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default", "org.apache.spark.sql.parquet") } @@ -2055,13 +2061,18 @@ setMethod("saveDF", #' saveAsTable(df, "myfile") #' } setMethod("saveAsTable", - signature(df = "DataFrame", tableName = "character", source = "character", - mode = "character"), + signature(df = "DataFrame", tableName = "character"), function(df, tableName, source = NULL, mode="error", ...){ if (is.null(source)) { - sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv) - source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default", - "org.apache.spark.sql.parquet") + if (exists(".sparkRSQLsc", envir = .sparkREnv)) { + sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv) + } else if (exists(".sparkRHivesc", envir = .sparkREnv)) { + sqlContext <- get(".sparkRHivesc", envir = .sparkREnv) + } else { + stop("sparkRHive or sparkRSQL context has to be specified") + } + source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default", + "org.apache.spark.sql.parquet") } jmode <- convertToJSaveMode(mode) options <- varargsToEnv(...) diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 9a8ab97bb8..04784d5156 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -539,7 +539,7 @@ setGeneric("sampleBy", function(x, col, fractions, seed) { standardGeneric("samp #' @rdname saveAsTable #' @export -setGeneric("saveAsTable", function(df, tableName, source, mode, ...) { +setGeneric("saveAsTable", function(df, tableName, source = NULL, mode = "error", ...) { standardGeneric("saveAsTable") }) @@ -552,7 +552,15 @@ setGeneric("transform", function(`_data`, ...) {standardGeneric("transform") }) #' @rdname write.df #' @export -setGeneric("saveDF", function(df, path, ...) { standardGeneric("saveDF") }) +setGeneric("write.df", function(df, path, source = NULL, mode = "error", ...) { + standardGeneric("write.df") +}) + +#' @rdname write.df +#' @export +setGeneric("saveDF", function(df, path, source = NULL, mode = "error", ...) { + standardGeneric("saveDF") +}) #' @rdname write.json #' @export diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index e59841ab9f..b52a11fb1a 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -953,8 +953,21 @@ test_that("test HiveContext", { df3 <- sql(hiveCtx, "select * from json2") expect_is(df3, "DataFrame") expect_equal(count(df3), 3) - unlink(jsonPath2) + + hivetestDataPath <- tempfile(pattern="sparkr-test", fileext=".tmp") + invisible(saveAsTable(df, "hivetestbl", path = hivetestDataPath)) + df4 <- sql(hiveCtx, "select * from hivetestbl") + expect_is(df4, "DataFrame") + expect_equal(count(df4), 3) + unlink(hivetestDataPath) + + parquetDataPath <- tempfile(pattern="sparkr-test", fileext=".tmp") + invisible(saveAsTable(df, "parquetest", "parquet", mode="overwrite", path=parquetDataPath)) + df5 <- sql(hiveCtx, "select * from parquetest") + expect_is(df5, "DataFrame") + expect_equal(count(df5), 3) + unlink(parquetDataPath) }) test_that("column operators", { |