aboutsummaryrefslogtreecommitdiff
path: root/R
diff options
context:
space:
mode:
authorFelix Cheung <felixcheung_m@hotmail.com>2017-04-06 09:15:13 -0700
committerFelix Cheung <felixcheung@apache.org>2017-04-06 09:15:13 -0700
commit5a693b4138d4ce948e3bcdbe28d5c01d5deb8fa9 (patch)
treeb71f548c8544c97b477e6a5fb69f895c03e27436 /R
parentbccc330193217b2ec9660e06f1db6dd58f7af5d8 (diff)
downloadspark-5a693b4138d4ce948e3bcdbe28d5c01d5deb8fa9.tar.gz
spark-5a693b4138d4ce948e3bcdbe28d5c01d5deb8fa9.tar.bz2
spark-5a693b4138d4ce948e3bcdbe28d5c01d5deb8fa9.zip
[SPARK-20195][SPARKR][SQL] add createTable catalog API and deprecate createExternalTable
## What changes were proposed in this pull request? Following up on #17483, add createTable (which is new in 2.2.0) and deprecate createExternalTable, plus a number of minor fixes ## How was this patch tested? manual, unit tests Author: Felix Cheung <felixcheung_m@hotmail.com> Closes #17511 from felixcheung/rceatetable.
Diffstat (limited to 'R')
-rw-r--r--R/pkg/NAMESPACE1
-rw-r--r--R/pkg/R/DataFrame.R4
-rw-r--r--R/pkg/R/catalog.R59
-rw-r--r--R/pkg/inst/tests/testthat/test_sparkSQL.R20
4 files changed, 68 insertions, 16 deletions
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 9b7e95ce30..ca45c6f9b0 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -361,6 +361,7 @@ export("as.DataFrame",
"clearCache",
"createDataFrame",
"createExternalTable",
+ "createTable",
"currentDatabase",
"dropTempTable",
"dropTempView",
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 97786df4ae..ec85f723c0 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -557,7 +557,7 @@ setMethod("insertInto",
jmode <- convertToJSaveMode(ifelse(overwrite, "overwrite", "append"))
write <- callJMethod(x@sdf, "write")
write <- callJMethod(write, "mode", jmode)
- callJMethod(write, "insertInto", tableName)
+ invisible(callJMethod(write, "insertInto", tableName))
})
#' Cache
@@ -2894,7 +2894,7 @@ setMethod("saveAsTable",
write <- callJMethod(write, "format", source)
write <- callJMethod(write, "mode", jmode)
write <- callJMethod(write, "options", options)
- callJMethod(write, "saveAsTable", tableName)
+ invisible(callJMethod(write, "saveAsTable", tableName))
})
#' summary
diff --git a/R/pkg/R/catalog.R b/R/pkg/R/catalog.R
index 4b7f841b55..e59a702433 100644
--- a/R/pkg/R/catalog.R
+++ b/R/pkg/R/catalog.R
@@ -17,7 +17,7 @@
# catalog.R: SparkSession catalog functions
-#' Create an external table
+#' (Deprecated) Create an external table
#'
#' Creates an external table based on the dataset in a data source,
#' Returns a SparkDataFrame associated with the external table.
@@ -29,10 +29,11 @@
#' @param tableName a name of the table.
#' @param path the path of files to load.
#' @param source the name of external data source.
-#' @param schema the schema of the data for certain data source.
+#' @param schema the schema of the data required for some data sources.
#' @param ... additional argument(s) passed to the method.
#' @return A SparkDataFrame.
-#' @rdname createExternalTable
+#' @rdname createExternalTable-deprecated
+#' @seealso \link{createTable}
#' @export
#' @examples
#'\dontrun{
@@ -43,24 +44,64 @@
#' @method createExternalTable default
#' @note createExternalTable since 1.4.0
createExternalTable.default <- function(tableName, path = NULL, source = NULL, schema = NULL, ...) {
+ .Deprecated("createTable", old = "createExternalTable")
+ createTable(tableName, path, source, schema, ...)
+}
+
+createExternalTable <- function(x, ...) {
+ dispatchFunc("createExternalTable(tableName, path = NULL, source = NULL, ...)", x, ...)
+}
+
+#' Creates a table based on the dataset in a data source
+#'
+#' Creates a table based on the dataset in a data source. Returns a SparkDataFrame associated with
+#' the table.
+#'
+#' The data source is specified by the \code{source} and a set of options(...).
+#' If \code{source} is not specified, the default data source configured by
+#' "spark.sql.sources.default" will be used. When a \code{path} is specified, an external table is
+#' created from the data at the given path. Otherwise a managed table is created.
+#'
+#' @param tableName the qualified or unqualified name that designates a table. If no database
+#' identifier is provided, it refers to a table in the current database.
+#' @param path (optional) the path of files to load.
+#' @param source (optional) the name of the data source.
+#' @param schema (optional) the schema of the data required for some data sources.
+#' @param ... additional named parameters as options for the data source.
+#' @return A SparkDataFrame.
+#' @rdname createTable
+#' @seealso \link{createExternalTable}
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' df <- createTable("myjson", path="path/to/json", source="json", schema)
+#'
+#' createTable("people", source = "json", schema = schema)
+#' insertInto(df, "people")
+#' }
+#' @name createTable
+#' @note createTable since 2.2.0
+createTable <- function(tableName, path = NULL, source = NULL, schema = NULL, ...) {
sparkSession <- getSparkSession()
options <- varargsToStrEnv(...)
if (!is.null(path)) {
options[["path"]] <- path
}
+ if (is.null(source)) {
+ source <- getDefaultSqlSource()
+ }
catalog <- callJMethod(sparkSession, "catalog")
if (is.null(schema)) {
- sdf <- callJMethod(catalog, "createExternalTable", tableName, source, options)
+ sdf <- callJMethod(catalog, "createTable", tableName, source, options)
+ } else if (class(schema) == "structType") {
+ sdf <- callJMethod(catalog, "createTable", tableName, source, schema$jobj, options)
} else {
- sdf <- callJMethod(catalog, "createExternalTable", tableName, source, schema$jobj, options)
+ stop("schema must be a structType.")
}
dataFrame(sdf)
}
-createExternalTable <- function(x, ...) {
- dispatchFunc("createExternalTable(tableName, path = NULL, source = NULL, ...)", x, ...)
-}
-
#' Cache Table
#'
#' Caches the specified table in-memory.
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index ad06711a79..58cf24256a 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -281,7 +281,7 @@ test_that("create DataFrame from RDD", {
setHiveContext(sc)
sql("CREATE TABLE people (name string, age double, height float)")
df <- read.df(jsonPathNa, "json", schema)
- invisible(insertInto(df, "people"))
+ insertInto(df, "people")
expect_equal(collect(sql("SELECT age from people WHERE name = 'Bob'"))$age,
c(16))
expect_equal(collect(sql("SELECT height from people WHERE name ='Bob'"))$height,
@@ -1268,7 +1268,16 @@ test_that("column calculation", {
test_that("test HiveContext", {
setHiveContext(sc)
- df <- createExternalTable("json", jsonPath, "json")
+
+ schema <- structType(structField("name", "string"), structField("age", "integer"),
+ structField("height", "float"))
+ createTable("people", source = "json", schema = schema)
+ df <- read.df(jsonPathNa, "json", schema)
+ insertInto(df, "people")
+ expect_equal(collect(sql("SELECT age from people WHERE name = 'Bob'"))$age, c(16))
+ sql("DROP TABLE people")
+
+ df <- createTable("json", jsonPath, "json")
expect_is(df, "SparkDataFrame")
expect_equal(count(df), 3)
df2 <- sql("select * from json")
@@ -1276,25 +1285,26 @@ test_that("test HiveContext", {
expect_equal(count(df2), 3)
jsonPath2 <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
- invisible(saveAsTable(df, "json2", "json", "append", path = jsonPath2))
+ saveAsTable(df, "json2", "json", "append", path = jsonPath2)
df3 <- sql("select * from json2")
expect_is(df3, "SparkDataFrame")
expect_equal(count(df3), 3)
unlink(jsonPath2)
hivetestDataPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
- invisible(saveAsTable(df, "hivetestbl", path = hivetestDataPath))
+ saveAsTable(df, "hivetestbl", path = hivetestDataPath)
df4 <- sql("select * from hivetestbl")
expect_is(df4, "SparkDataFrame")
expect_equal(count(df4), 3)
unlink(hivetestDataPath)
parquetDataPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
- invisible(saveAsTable(df, "parquetest", "parquet", mode = "overwrite", path = parquetDataPath))
+ saveAsTable(df, "parquetest", "parquet", mode = "overwrite", path = parquetDataPath)
df5 <- sql("select * from parquetest")
expect_is(df5, "SparkDataFrame")
expect_equal(count(df5), 3)
unlink(parquetDataPath)
+
unsetHiveContext()
})