From 5a693b4138d4ce948e3bcdbe28d5c01d5deb8fa9 Mon Sep 17 00:00:00 2001 From: Felix Cheung Date: Thu, 6 Apr 2017 09:15:13 -0700 Subject: [SPARK-20195][SPARKR][SQL] add createTable catalog API and deprecate createExternalTable ## What changes were proposed in this pull request? Following up on #17483, add createTable (which is new in 2.2.0) and deprecate createExternalTable, plus a number of minor fixes ## How was this patch tested? manual, unit tests Author: Felix Cheung Closes #17511 from felixcheung/rceatetable. --- R/pkg/NAMESPACE | 1 + R/pkg/R/DataFrame.R | 4 +-- R/pkg/R/catalog.R | 59 ++++++++++++++++++++++++++----- R/pkg/inst/tests/testthat/test_sparkSQL.R | 20 ++++++++--- 4 files changed, 68 insertions(+), 16 deletions(-) (limited to 'R/pkg') diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 9b7e95ce30..ca45c6f9b0 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -361,6 +361,7 @@ export("as.DataFrame", "clearCache", "createDataFrame", "createExternalTable", + "createTable", "currentDatabase", "dropTempTable", "dropTempView", diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 97786df4ae..ec85f723c0 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -557,7 +557,7 @@ setMethod("insertInto", jmode <- convertToJSaveMode(ifelse(overwrite, "overwrite", "append")) write <- callJMethod(x@sdf, "write") write <- callJMethod(write, "mode", jmode) - callJMethod(write, "insertInto", tableName) + invisible(callJMethod(write, "insertInto", tableName)) }) #' Cache @@ -2894,7 +2894,7 @@ setMethod("saveAsTable", write <- callJMethod(write, "format", source) write <- callJMethod(write, "mode", jmode) write <- callJMethod(write, "options", options) - callJMethod(write, "saveAsTable", tableName) + invisible(callJMethod(write, "saveAsTable", tableName)) }) #' summary diff --git a/R/pkg/R/catalog.R b/R/pkg/R/catalog.R index 4b7f841b55..e59a702433 100644 --- a/R/pkg/R/catalog.R +++ b/R/pkg/R/catalog.R @@ -17,7 +17,7 @@ # catalog.R: SparkSession catalog functions -#' Create an external table +#' (Deprecated) Create an external table #' #' Creates an external table based on the dataset in a data source, #' Returns a SparkDataFrame associated with the external table. @@ -29,10 +29,11 @@ #' @param tableName a name of the table. #' @param path the path of files to load. #' @param source the name of external data source. -#' @param schema the schema of the data for certain data source. +#' @param schema the schema of the data required for some data sources. #' @param ... additional argument(s) passed to the method. #' @return A SparkDataFrame. -#' @rdname createExternalTable +#' @rdname createExternalTable-deprecated +#' @seealso \link{createTable} #' @export #' @examples #'\dontrun{ @@ -43,24 +44,64 @@ #' @method createExternalTable default #' @note createExternalTable since 1.4.0 createExternalTable.default <- function(tableName, path = NULL, source = NULL, schema = NULL, ...) { + .Deprecated("createTable", old = "createExternalTable") + createTable(tableName, path, source, schema, ...) +} + +createExternalTable <- function(x, ...) { + dispatchFunc("createExternalTable(tableName, path = NULL, source = NULL, ...)", x, ...) +} + +#' Creates a table based on the dataset in a data source +#' +#' Creates a table based on the dataset in a data source. Returns a SparkDataFrame associated with +#' the table. +#' +#' The data source is specified by the \code{source} and a set of options(...). +#' If \code{source} is not specified, the default data source configured by +#' "spark.sql.sources.default" will be used. When a \code{path} is specified, an external table is +#' created from the data at the given path. Otherwise a managed table is created. +#' +#' @param tableName the qualified or unqualified name that designates a table. If no database +#' identifier is provided, it refers to a table in the current database. +#' @param path (optional) the path of files to load. +#' @param source (optional) the name of the data source. +#' @param schema (optional) the schema of the data required for some data sources. +#' @param ... additional named parameters as options for the data source. +#' @return A SparkDataFrame. +#' @rdname createTable +#' @seealso \link{createExternalTable} +#' @export +#' @examples +#'\dontrun{ +#' sparkR.session() +#' df <- createTable("myjson", path="path/to/json", source="json", schema) +#' +#' createTable("people", source = "json", schema = schema) +#' insertInto(df, "people") +#' } +#' @name createTable +#' @note createTable since 2.2.0 +createTable <- function(tableName, path = NULL, source = NULL, schema = NULL, ...) { sparkSession <- getSparkSession() options <- varargsToStrEnv(...) if (!is.null(path)) { options[["path"]] <- path } + if (is.null(source)) { + source <- getDefaultSqlSource() + } catalog <- callJMethod(sparkSession, "catalog") if (is.null(schema)) { - sdf <- callJMethod(catalog, "createExternalTable", tableName, source, options) + sdf <- callJMethod(catalog, "createTable", tableName, source, options) + } else if (class(schema) == "structType") { + sdf <- callJMethod(catalog, "createTable", tableName, source, schema$jobj, options) } else { - sdf <- callJMethod(catalog, "createExternalTable", tableName, source, schema$jobj, options) + stop("schema must be a structType.") } dataFrame(sdf) } -createExternalTable <- function(x, ...) { - dispatchFunc("createExternalTable(tableName, path = NULL, source = NULL, ...)", x, ...) -} - #' Cache Table #' #' Caches the specified table in-memory. diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index ad06711a79..58cf24256a 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -281,7 +281,7 @@ test_that("create DataFrame from RDD", { setHiveContext(sc) sql("CREATE TABLE people (name string, age double, height float)") df <- read.df(jsonPathNa, "json", schema) - invisible(insertInto(df, "people")) + insertInto(df, "people") expect_equal(collect(sql("SELECT age from people WHERE name = 'Bob'"))$age, c(16)) expect_equal(collect(sql("SELECT height from people WHERE name ='Bob'"))$height, @@ -1268,7 +1268,16 @@ test_that("column calculation", { test_that("test HiveContext", { setHiveContext(sc) - df <- createExternalTable("json", jsonPath, "json") + + schema <- structType(structField("name", "string"), structField("age", "integer"), + structField("height", "float")) + createTable("people", source = "json", schema = schema) + df <- read.df(jsonPathNa, "json", schema) + insertInto(df, "people") + expect_equal(collect(sql("SELECT age from people WHERE name = 'Bob'"))$age, c(16)) + sql("DROP TABLE people") + + df <- createTable("json", jsonPath, "json") expect_is(df, "SparkDataFrame") expect_equal(count(df), 3) df2 <- sql("select * from json") @@ -1276,25 +1285,26 @@ test_that("test HiveContext", { expect_equal(count(df2), 3) jsonPath2 <- tempfile(pattern = "sparkr-test", fileext = ".tmp") - invisible(saveAsTable(df, "json2", "json", "append", path = jsonPath2)) + saveAsTable(df, "json2", "json", "append", path = jsonPath2) df3 <- sql("select * from json2") expect_is(df3, "SparkDataFrame") expect_equal(count(df3), 3) unlink(jsonPath2) hivetestDataPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp") - invisible(saveAsTable(df, "hivetestbl", path = hivetestDataPath)) + saveAsTable(df, "hivetestbl", path = hivetestDataPath) df4 <- sql("select * from hivetestbl") expect_is(df4, "SparkDataFrame") expect_equal(count(df4), 3) unlink(hivetestDataPath) parquetDataPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp") - invisible(saveAsTable(df, "parquetest", "parquet", mode = "overwrite", path = parquetDataPath)) + saveAsTable(df, "parquetest", "parquet", mode = "overwrite", path = parquetDataPath) df5 <- sql("select * from parquetest") expect_is(df5, "SparkDataFrame") expect_equal(count(df5), 3) unlink(parquetDataPath) + unsetHiveContext() }) -- cgit v1.2.3