aboutsummaryrefslogtreecommitdiff
path: root/R
diff options
context:
space:
mode:
authorFelix Cheung <felixcheung_m@hotmail.com>2017-04-02 11:59:27 -0700
committerFelix Cheung <felixcheung@apache.org>2017-04-02 11:59:27 -0700
commit93dbfe705f3e7410a7267e406332ffb3c3077829 (patch)
treede810d4351b4cafe94e112e777058804ce4e6d8e /R
parent657cb9541db8508ce64d08cc3de14cd02adf16b5 (diff)
downloadspark-93dbfe705f3e7410a7267e406332ffb3c3077829.tar.gz
spark-93dbfe705f3e7410a7267e406332ffb3c3077829.tar.bz2
spark-93dbfe705f3e7410a7267e406332ffb3c3077829.zip
[SPARK-20159][SPARKR][SQL] Support all catalog API in R
## What changes were proposed in this pull request? Add a set of catalog API in R ``` "currentDatabase", "listColumns", "listDatabases", "listFunctions", "listTables", "recoverPartitions", "refreshByPath", "refreshTable", "setCurrentDatabase", ``` https://github.com/apache/spark/pull/17483/files#diff-6929e6c5e59017ff954e110df20ed7ff ## How was this patch tested? manual tests, unit tests Author: Felix Cheung <felixcheung_m@hotmail.com> Closes #17483 from felixcheung/rcatalog.
Diffstat (limited to 'R')
-rw-r--r--R/pkg/DESCRIPTION1
-rw-r--r--R/pkg/NAMESPACE9
-rw-r--r--R/pkg/R/SQLContext.R233
-rw-r--r--R/pkg/R/catalog.R479
-rw-r--r--R/pkg/R/utils.R18
-rw-r--r--R/pkg/inst/tests/testthat/test_sparkSQL.R66
6 files changed, 569 insertions, 237 deletions
diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION
index 2ea90f7d36..00dde64324 100644
--- a/R/pkg/DESCRIPTION
+++ b/R/pkg/DESCRIPTION
@@ -32,6 +32,7 @@ Collate:
'pairRDD.R'
'DataFrame.R'
'SQLContext.R'
+ 'catalog.R'
'WindowSpec.R'
'backend.R'
'broadcast.R'
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 8be7875ad2..c02046c94b 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -358,9 +358,14 @@ export("as.DataFrame",
"clearCache",
"createDataFrame",
"createExternalTable",
+ "currentDatabase",
"dropTempTable",
"dropTempView",
"jsonFile",
+ "listColumns",
+ "listDatabases",
+ "listFunctions",
+ "listTables",
"loadDF",
"parquetFile",
"read.df",
@@ -370,7 +375,11 @@ export("as.DataFrame",
"read.parquet",
"read.stream",
"read.text",
+ "recoverPartitions",
+ "refreshByPath",
+ "refreshTable",
"setCheckpointDir",
+ "setCurrentDatabase",
"spark.lapply",
"spark.addFile",
"spark.getSparkFilesRootDirectory",
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index b75fb0159d..a1edef7608 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -569,200 +569,6 @@ tableToDF <- function(tableName) {
dataFrame(sdf)
}
-#' Tables
-#'
-#' Returns a SparkDataFrame containing names of tables in the given database.
-#'
-#' @param databaseName name of the database
-#' @return a SparkDataFrame
-#' @rdname tables
-#' @export
-#' @examples
-#'\dontrun{
-#' sparkR.session()
-#' tables("hive")
-#' }
-#' @name tables
-#' @method tables default
-#' @note tables since 1.4.0
-tables.default <- function(databaseName = NULL) {
- sparkSession <- getSparkSession()
- jdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getTables", sparkSession, databaseName)
- dataFrame(jdf)
-}
-
-tables <- function(x, ...) {
- dispatchFunc("tables(databaseName = NULL)", x, ...)
-}
-
-#' Table Names
-#'
-#' Returns the names of tables in the given database as an array.
-#'
-#' @param databaseName name of the database
-#' @return a list of table names
-#' @rdname tableNames
-#' @export
-#' @examples
-#'\dontrun{
-#' sparkR.session()
-#' tableNames("hive")
-#' }
-#' @name tableNames
-#' @method tableNames default
-#' @note tableNames since 1.4.0
-tableNames.default <- function(databaseName = NULL) {
- sparkSession <- getSparkSession()
- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
- "getTableNames",
- sparkSession,
- databaseName)
-}
-
-tableNames <- function(x, ...) {
- dispatchFunc("tableNames(databaseName = NULL)", x, ...)
-}
-
-#' Cache Table
-#'
-#' Caches the specified table in-memory.
-#'
-#' @param tableName The name of the table being cached
-#' @return SparkDataFrame
-#' @rdname cacheTable
-#' @export
-#' @examples
-#'\dontrun{
-#' sparkR.session()
-#' path <- "path/to/file.json"
-#' df <- read.json(path)
-#' createOrReplaceTempView(df, "table")
-#' cacheTable("table")
-#' }
-#' @name cacheTable
-#' @method cacheTable default
-#' @note cacheTable since 1.4.0
-cacheTable.default <- function(tableName) {
- sparkSession <- getSparkSession()
- catalog <- callJMethod(sparkSession, "catalog")
- invisible(callJMethod(catalog, "cacheTable", tableName))
-}
-
-cacheTable <- function(x, ...) {
- dispatchFunc("cacheTable(tableName)", x, ...)
-}
-
-#' Uncache Table
-#'
-#' Removes the specified table from the in-memory cache.
-#'
-#' @param tableName The name of the table being uncached
-#' @return SparkDataFrame
-#' @rdname uncacheTable
-#' @export
-#' @examples
-#'\dontrun{
-#' sparkR.session()
-#' path <- "path/to/file.json"
-#' df <- read.json(path)
-#' createOrReplaceTempView(df, "table")
-#' uncacheTable("table")
-#' }
-#' @name uncacheTable
-#' @method uncacheTable default
-#' @note uncacheTable since 1.4.0
-uncacheTable.default <- function(tableName) {
- sparkSession <- getSparkSession()
- catalog <- callJMethod(sparkSession, "catalog")
- invisible(callJMethod(catalog, "uncacheTable", tableName))
-}
-
-uncacheTable <- function(x, ...) {
- dispatchFunc("uncacheTable(tableName)", x, ...)
-}
-
-#' Clear Cache
-#'
-#' Removes all cached tables from the in-memory cache.
-#'
-#' @rdname clearCache
-#' @export
-#' @examples
-#' \dontrun{
-#' clearCache()
-#' }
-#' @name clearCache
-#' @method clearCache default
-#' @note clearCache since 1.4.0
-clearCache.default <- function() {
- sparkSession <- getSparkSession()
- catalog <- callJMethod(sparkSession, "catalog")
- invisible(callJMethod(catalog, "clearCache"))
-}
-
-clearCache <- function() {
- dispatchFunc("clearCache()")
-}
-
-#' (Deprecated) Drop Temporary Table
-#'
-#' Drops the temporary table with the given table name in the catalog.
-#' If the table has been cached/persisted before, it's also unpersisted.
-#'
-#' @param tableName The name of the SparkSQL table to be dropped.
-#' @seealso \link{dropTempView}
-#' @rdname dropTempTable-deprecated
-#' @export
-#' @examples
-#' \dontrun{
-#' sparkR.session()
-#' df <- read.df(path, "parquet")
-#' createOrReplaceTempView(df, "table")
-#' dropTempTable("table")
-#' }
-#' @name dropTempTable
-#' @method dropTempTable default
-#' @note dropTempTable since 1.4.0
-dropTempTable.default <- function(tableName) {
- if (class(tableName) != "character") {
- stop("tableName must be a string.")
- }
- dropTempView(tableName)
-}
-
-dropTempTable <- function(x, ...) {
- .Deprecated("dropTempView")
- dispatchFunc("dropTempView(viewName)", x, ...)
-}
-
-#' Drops the temporary view with the given view name in the catalog.
-#'
-#' Drops the temporary view with the given view name in the catalog.
-#' If the view has been cached before, then it will also be uncached.
-#'
-#' @param viewName the name of the view to be dropped.
-#' @return TRUE if the view is dropped successfully, FALSE otherwise.
-#' @rdname dropTempView
-#' @name dropTempView
-#' @export
-#' @examples
-#' \dontrun{
-#' sparkR.session()
-#' df <- read.df(path, "parquet")
-#' createOrReplaceTempView(df, "table")
-#' dropTempView("table")
-#' }
-#' @note since 2.0.0
-
-dropTempView <- function(viewName) {
- sparkSession <- getSparkSession()
- if (class(viewName) != "character") {
- stop("viewName must be a string.")
- }
- catalog <- callJMethod(sparkSession, "catalog")
- callJMethod(catalog, "dropTempView", viewName)
-}
-
#' Load a SparkDataFrame
#'
#' Returns the dataset in a data source as a SparkDataFrame
@@ -841,45 +647,6 @@ loadDF <- function(x = NULL, ...) {
dispatchFunc("loadDF(path = NULL, source = NULL, schema = NULL, ...)", x, ...)
}
-#' Create an external table
-#'
-#' Creates an external table based on the dataset in a data source,
-#' Returns a SparkDataFrame associated with the external table.
-#'
-#' The data source is specified by the \code{source} and a set of options(...).
-#' If \code{source} is not specified, the default data source configured by
-#' "spark.sql.sources.default" will be used.
-#'
-#' @param tableName a name of the table.
-#' @param path the path of files to load.
-#' @param source the name of external data source.
-#' @param ... additional argument(s) passed to the method.
-#' @return A SparkDataFrame.
-#' @rdname createExternalTable
-#' @export
-#' @examples
-#'\dontrun{
-#' sparkR.session()
-#' df <- createExternalTable("myjson", path="path/to/json", source="json")
-#' }
-#' @name createExternalTable
-#' @method createExternalTable default
-#' @note createExternalTable since 1.4.0
-createExternalTable.default <- function(tableName, path = NULL, source = NULL, ...) {
- sparkSession <- getSparkSession()
- options <- varargsToStrEnv(...)
- if (!is.null(path)) {
- options[["path"]] <- path
- }
- catalog <- callJMethod(sparkSession, "catalog")
- sdf <- callJMethod(catalog, "createExternalTable", tableName, source, options)
- dataFrame(sdf)
-}
-
-createExternalTable <- function(x, ...) {
- dispatchFunc("createExternalTable(tableName, path = NULL, source = NULL, ...)", x, ...)
-}
-
#' Create a SparkDataFrame representing the database table accessible via JDBC URL
#'
#' Additional JDBC database connection properties can be set (...)
diff --git a/R/pkg/R/catalog.R b/R/pkg/R/catalog.R
new file mode 100644
index 0000000000..07a89f763c
--- /dev/null
+++ b/R/pkg/R/catalog.R
@@ -0,0 +1,479 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# catalog.R: SparkSession catalog functions
+
+#' Create an external table
+#'
+#' Creates an external table based on the dataset in a data source,
+#' Returns a SparkDataFrame associated with the external table.
+#'
+#' The data source is specified by the \code{source} and a set of options(...).
+#' If \code{source} is not specified, the default data source configured by
+#' "spark.sql.sources.default" will be used.
+#'
+#' @param tableName a name of the table.
+#' @param path the path of files to load.
+#' @param source the name of external data source.
+#' @param schema the schema of the data for certain data source.
+#' @param ... additional argument(s) passed to the method.
+#' @return A SparkDataFrame.
+#' @rdname createExternalTable
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' df <- createExternalTable("myjson", path="path/to/json", source="json", schema)
+#' }
+#' @name createExternalTable
+#' @method createExternalTable default
+#' @note createExternalTable since 1.4.0
+createExternalTable.default <- function(tableName, path = NULL, source = NULL, schema = NULL, ...) {
+ sparkSession <- getSparkSession()
+ options <- varargsToStrEnv(...)
+ if (!is.null(path)) {
+ options[["path"]] <- path
+ }
+ catalog <- callJMethod(sparkSession, "catalog")
+ if (is.null(schema)) {
+ sdf <- callJMethod(catalog, "createExternalTable", tableName, source, options)
+ } else {
+ sdf <- callJMethod(catalog, "createExternalTable", tableName, source, schema$jobj, options)
+ }
+ dataFrame(sdf)
+}
+
+createExternalTable <- function(x, ...) {
+ dispatchFunc("createExternalTable(tableName, path = NULL, source = NULL, ...)", x, ...)
+}
+
+#' Cache Table
+#'
+#' Caches the specified table in-memory.
+#'
+#' @param tableName The name of the table being cached
+#' @return SparkDataFrame
+#' @rdname cacheTable
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' path <- "path/to/file.json"
+#' df <- read.json(path)
+#' createOrReplaceTempView(df, "table")
+#' cacheTable("table")
+#' }
+#' @name cacheTable
+#' @method cacheTable default
+#' @note cacheTable since 1.4.0
+cacheTable.default <- function(tableName) {
+ sparkSession <- getSparkSession()
+ catalog <- callJMethod(sparkSession, "catalog")
+ invisible(handledCallJMethod(catalog, "cacheTable", tableName))
+}
+
+cacheTable <- function(x, ...) {
+ dispatchFunc("cacheTable(tableName)", x, ...)
+}
+
+#' Uncache Table
+#'
+#' Removes the specified table from the in-memory cache.
+#'
+#' @param tableName The name of the table being uncached
+#' @return SparkDataFrame
+#' @rdname uncacheTable
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' path <- "path/to/file.json"
+#' df <- read.json(path)
+#' createOrReplaceTempView(df, "table")
+#' uncacheTable("table")
+#' }
+#' @name uncacheTable
+#' @method uncacheTable default
+#' @note uncacheTable since 1.4.0
+uncacheTable.default <- function(tableName) {
+ sparkSession <- getSparkSession()
+ catalog <- callJMethod(sparkSession, "catalog")
+ invisible(handledCallJMethod(catalog, "uncacheTable", tableName))
+}
+
+uncacheTable <- function(x, ...) {
+ dispatchFunc("uncacheTable(tableName)", x, ...)
+}
+
+#' Clear Cache
+#'
+#' Removes all cached tables from the in-memory cache.
+#'
+#' @rdname clearCache
+#' @export
+#' @examples
+#' \dontrun{
+#' clearCache()
+#' }
+#' @name clearCache
+#' @method clearCache default
+#' @note clearCache since 1.4.0
+clearCache.default <- function() {
+ sparkSession <- getSparkSession()
+ catalog <- callJMethod(sparkSession, "catalog")
+ invisible(callJMethod(catalog, "clearCache"))
+}
+
+clearCache <- function() {
+ dispatchFunc("clearCache()")
+}
+
+#' (Deprecated) Drop Temporary Table
+#'
+#' Drops the temporary table with the given table name in the catalog.
+#' If the table has been cached/persisted before, it's also unpersisted.
+#'
+#' @param tableName The name of the SparkSQL table to be dropped.
+#' @seealso \link{dropTempView}
+#' @rdname dropTempTable-deprecated
+#' @export
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' df <- read.df(path, "parquet")
+#' createOrReplaceTempView(df, "table")
+#' dropTempTable("table")
+#' }
+#' @name dropTempTable
+#' @method dropTempTable default
+#' @note dropTempTable since 1.4.0
+dropTempTable.default <- function(tableName) {
+ if (class(tableName) != "character") {
+ stop("tableName must be a string.")
+ }
+ dropTempView(tableName)
+}
+
+dropTempTable <- function(x, ...) {
+ .Deprecated("dropTempView")
+ dispatchFunc("dropTempView(viewName)", x, ...)
+}
+
+#' Drops the temporary view with the given view name in the catalog.
+#'
+#' Drops the temporary view with the given view name in the catalog.
+#' If the view has been cached before, then it will also be uncached.
+#'
+#' @param viewName the name of the view to be dropped.
+#' @return TRUE if the view is dropped successfully, FALSE otherwise.
+#' @rdname dropTempView
+#' @name dropTempView
+#' @export
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' df <- read.df(path, "parquet")
+#' createOrReplaceTempView(df, "table")
+#' dropTempView("table")
+#' }
+#' @note since 2.0.0
+dropTempView <- function(viewName) {
+ sparkSession <- getSparkSession()
+ if (class(viewName) != "character") {
+ stop("viewName must be a string.")
+ }
+ catalog <- callJMethod(sparkSession, "catalog")
+ callJMethod(catalog, "dropTempView", viewName)
+}
+
+#' Tables
+#'
+#' Returns a SparkDataFrame containing names of tables in the given database.
+#'
+#' @param databaseName (optional) name of the database
+#' @return a SparkDataFrame
+#' @rdname tables
+#' @seealso \link{listTables}
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' tables("hive")
+#' }
+#' @name tables
+#' @method tables default
+#' @note tables since 1.4.0
+tables.default <- function(databaseName = NULL) {
+ # rename column to match previous output schema
+ withColumnRenamed(listTables(databaseName), "name", "tableName")
+}
+
+tables <- function(x, ...) {
+ dispatchFunc("tables(databaseName = NULL)", x, ...)
+}
+
+#' Table Names
+#'
+#' Returns the names of tables in the given database as an array.
+#'
+#' @param databaseName (optional) name of the database
+#' @return a list of table names
+#' @rdname tableNames
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' tableNames("hive")
+#' }
+#' @name tableNames
+#' @method tableNames default
+#' @note tableNames since 1.4.0
+tableNames.default <- function(databaseName = NULL) {
+ sparkSession <- getSparkSession()
+ callJStatic("org.apache.spark.sql.api.r.SQLUtils",
+ "getTableNames",
+ sparkSession,
+ databaseName)
+}
+
+tableNames <- function(x, ...) {
+ dispatchFunc("tableNames(databaseName = NULL)", x, ...)
+}
+
+#' Returns the current default database
+#'
+#' Returns the current default database.
+#'
+#' @return name of the current default database.
+#' @rdname currentDatabase
+#' @name currentDatabase
+#' @export
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' currentDatabase()
+#' }
+#' @note since 2.2.0
+currentDatabase <- function() {
+ sparkSession <- getSparkSession()
+ catalog <- callJMethod(sparkSession, "catalog")
+ callJMethod(catalog, "currentDatabase")
+}
+
+#' Sets the current default database
+#'
+#' Sets the current default database.
+#'
+#' @param databaseName name of the database
+#' @rdname setCurrentDatabase
+#' @name setCurrentDatabase
+#' @export
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' setCurrentDatabase("default")
+#' }
+#' @note since 2.2.0
+setCurrentDatabase <- function(databaseName) {
+ sparkSession <- getSparkSession()
+ if (class(databaseName) != "character") {
+ stop("databaseName must be a string.")
+ }
+ catalog <- callJMethod(sparkSession, "catalog")
+ invisible(handledCallJMethod(catalog, "setCurrentDatabase", databaseName))
+}
+
+#' Returns a list of databases available
+#'
+#' Returns a list of databases available.
+#'
+#' @return a SparkDataFrame of the list of databases.
+#' @rdname listDatabases
+#' @name listDatabases
+#' @export
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' listDatabases()
+#' }
+#' @note since 2.2.0
+listDatabases <- function() {
+ sparkSession <- getSparkSession()
+ catalog <- callJMethod(sparkSession, "catalog")
+ dataFrame(callJMethod(callJMethod(catalog, "listDatabases"), "toDF"))
+}
+
+#' Returns a list of tables in the specified database
+#'
+#' Returns a list of tables in the specified database.
+#' This includes all temporary tables.
+#'
+#' @param databaseName (optional) name of the database
+#' @return a SparkDataFrame of the list of tables.
+#' @rdname listTables
+#' @name listTables
+#' @seealso \link{tables}
+#' @export
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' listTables()
+#' listTables("default")
+#' }
+#' @note since 2.2.0
+listTables <- function(databaseName = NULL) {
+ sparkSession <- getSparkSession()
+ if (!is.null(databaseName) && class(databaseName) != "character") {
+ stop("databaseName must be a string.")
+ }
+ catalog <- callJMethod(sparkSession, "catalog")
+ jdst <- if (is.null(databaseName)) {
+ callJMethod(catalog, "listTables")
+ } else {
+ handledCallJMethod(catalog, "listTables", databaseName)
+ }
+ dataFrame(callJMethod(jdst, "toDF"))
+}
+
+#' Returns a list of columns for the given table in the specified database
+#'
+#' Returns a list of columns for the given table in the specified database.
+#'
+#' @param tableName a name of the table.
+#' @param databaseName (optional) name of the database
+#' @return a SparkDataFrame of the list of column descriptions.
+#' @rdname listColumns
+#' @name listColumns
+#' @export
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' listColumns("mytable")
+#' }
+#' @note since 2.2.0
+listColumns <- function(tableName, databaseName = NULL) {
+ sparkSession <- getSparkSession()
+ if (!is.null(databaseName) && class(databaseName) != "character") {
+ stop("databaseName must be a string.")
+ }
+ catalog <- callJMethod(sparkSession, "catalog")
+ jdst <- if (is.null(databaseName)) {
+ handledCallJMethod(catalog, "listColumns", tableName)
+ } else {
+ handledCallJMethod(catalog, "listColumns", databaseName, tableName)
+ }
+ dataFrame(callJMethod(jdst, "toDF"))
+}
+
+#' Returns a list of functions registered in the specified database
+#'
+#' Returns a list of functions registered in the specified database.
+#' This includes all temporary functions.
+#'
+#' @param databaseName (optional) name of the database
+#' @return a SparkDataFrame of the list of function descriptions.
+#' @rdname listFunctions
+#' @name listFunctions
+#' @export
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' listFunctions()
+#' }
+#' @note since 2.2.0
+listFunctions <- function(databaseName = NULL) {
+ sparkSession <- getSparkSession()
+ if (!is.null(databaseName) && class(databaseName) != "character") {
+ stop("databaseName must be a string.")
+ }
+ catalog <- callJMethod(sparkSession, "catalog")
+ jdst <- if (is.null(databaseName)) {
+ callJMethod(catalog, "listFunctions")
+ } else {
+ handledCallJMethod(catalog, "listFunctions", databaseName)
+ }
+ dataFrame(callJMethod(jdst, "toDF"))
+}
+
+#' Recover all the partitions in the directory of a table and update the catalog
+#'
+#' Recover all the partitions in the directory of a table and update the catalog. The name should
+#' reference a partitioned table, and not a temporary view.
+#'
+#' @param tableName a name of the table.
+#' @rdname recoverPartitions
+#' @name recoverPartitions
+#' @export
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' recoverPartitions("myTable")
+#' }
+#' @note since 2.2.0
+recoverPartitions <- function(tableName) {
+ sparkSession <- getSparkSession()
+ catalog <- callJMethod(sparkSession, "catalog")
+ invisible(handledCallJMethod(catalog, "recoverPartitions", tableName))
+}
+
+#' Invalidate and refresh all the cached metadata of the given table
+#'
+#' Invalidate and refresh all the cached metadata of the given table. For performance reasons,
+#' Spark SQL or the external data source library it uses might cache certain metadata about a
+#' table, such as the location of blocks. When those change outside of Spark SQL, users should
+#' call this function to invalidate the cache.
+#'
+#' If this table is cached as an InMemoryRelation, drop the original cached version and make the
+#' new version cached lazily.
+#'
+#' @param tableName a name of the table.
+#' @rdname refreshTable
+#' @name refreshTable
+#' @export
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' refreshTable("myTable")
+#' }
+#' @note since 2.2.0
+refreshTable <- function(tableName) {
+ sparkSession <- getSparkSession()
+ catalog <- callJMethod(sparkSession, "catalog")
+ invisible(handledCallJMethod(catalog, "refreshTable", tableName))
+}
+
+#' Invalidate and refresh all the cached data and metadata for SparkDataFrame containing path
+#'
+#' Invalidate and refresh all the cached data (and the associated metadata) for any SparkDataFrame
+#' that contains the given data source path. Path matching is by prefix, i.e. "/" would invalidate
+#' everything that is cached.
+#'
+#' @param path the path of the data source.
+#' @rdname refreshByPath
+#' @name refreshByPath
+#' @export
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' refreshByPath("/path")
+#' }
+#' @note since 2.2.0
+refreshByPath <- function(path) {
+ sparkSession <- getSparkSession()
+ catalog <- callJMethod(sparkSession, "catalog")
+ invisible(handledCallJMethod(catalog, "refreshByPath", path))
+}
diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R
index 810de9917e..fbc89e9884 100644
--- a/R/pkg/R/utils.R
+++ b/R/pkg/R/utils.R
@@ -846,6 +846,24 @@ captureJVMException <- function(e, method) {
# Extract the first message of JVM exception.
first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
stop(paste0(rmsg, "analysis error - ", first), call. = FALSE)
+ } else
+ if (any(grep("org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException: ", stacktrace))) {
+ msg <- strsplit(stacktrace, "org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException: ",
+ fixed = TRUE)[[1]]
+ # Extract "Error in ..." message.
+ rmsg <- msg[1]
+ # Extract the first message of JVM exception.
+ first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
+ stop(paste0(rmsg, "no such database - ", first), call. = FALSE)
+ } else
+ if (any(grep("org.apache.spark.sql.catalyst.analysis.NoSuchTableException: ", stacktrace))) {
+ msg <- strsplit(stacktrace, "org.apache.spark.sql.catalyst.analysis.NoSuchTableException: ",
+ fixed = TRUE)[[1]]
+ # Extract "Error in ..." message.
+ rmsg <- msg[1]
+ # Extract the first message of JVM exception.
+ first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
+ stop(paste0(rmsg, "no such table - ", first), call. = FALSE)
} else {
stop(stacktrace, call. = FALSE)
}
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 5acf8719d1..ad06711a79 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -645,16 +645,20 @@ test_that("test tableNames and tables", {
df <- read.json(jsonPath)
createOrReplaceTempView(df, "table1")
expect_equal(length(tableNames()), 1)
- tables <- tables()
+ expect_equal(length(tableNames("default")), 1)
+ tables <- listTables()
expect_equal(count(tables), 1)
+ expect_equal(count(tables()), count(tables))
+ expect_true("tableName" %in% colnames(tables()))
+ expect_true(all(c("tableName", "database", "isTemporary") %in% colnames(tables())))
suppressWarnings(registerTempTable(df, "table2"))
- tables <- tables()
+ tables <- listTables()
expect_equal(count(tables), 2)
suppressWarnings(dropTempTable("table1"))
expect_true(dropTempView("table2"))
- tables <- tables()
+ tables <- listTables()
expect_equal(count(tables), 0)
})
@@ -686,6 +690,9 @@ test_that("test cache, uncache and clearCache", {
uncacheTable("table1")
clearCache()
expect_true(dropTempView("table1"))
+
+ expect_error(uncacheTable("foo"),
+ "Error in uncacheTable : no such table - Table or view 'foo' not found in database 'default'")
})
test_that("insertInto() on a registered table", {
@@ -2821,7 +2828,7 @@ test_that("createDataFrame sqlContext parameter backward compatibility", {
# more tests for SPARK-16538
createOrReplaceTempView(df, "table")
- SparkR::tables()
+ SparkR::listTables()
SparkR::sql("SELECT 1")
suppressWarnings(SparkR::sql(sqlContext, "SELECT * FROM table"))
suppressWarnings(SparkR::dropTempTable(sqlContext, "table"))
@@ -2977,6 +2984,57 @@ test_that("Collect on DataFrame when NAs exists at the top of a timestamp column
expect_equal(class(ldf3$col3), c("POSIXct", "POSIXt"))
})
+test_that("catalog APIs, currentDatabase, setCurrentDatabase, listDatabases", {
+ expect_equal(currentDatabase(), "default")
+ expect_error(setCurrentDatabase("default"), NA)
+ expect_error(setCurrentDatabase("foo"),
+ "Error in setCurrentDatabase : analysis error - Database 'foo' does not exist")
+ dbs <- collect(listDatabases())
+ expect_equal(names(dbs), c("name", "description", "locationUri"))
+ expect_equal(dbs[[1]], "default")
+})
+
+test_that("catalog APIs, listTables, listColumns, listFunctions", {
+ tb <- listTables()
+ count <- count(tables())
+ expect_equal(nrow(tb), count)
+ expect_equal(colnames(tb), c("name", "database", "description", "tableType", "isTemporary"))
+
+ createOrReplaceTempView(as.DataFrame(cars), "cars")
+
+ tb <- listTables()
+ expect_equal(nrow(tb), count + 1)
+ tbs <- collect(tb)
+ expect_true(nrow(tbs[tbs$name == "cars", ]) > 0)
+ expect_error(listTables("bar"),
+ "Error in listTables : no such database - Database 'bar' not found")
+
+ c <- listColumns("cars")
+ expect_equal(nrow(c), 2)
+ expect_equal(colnames(c),
+ c("name", "description", "dataType", "nullable", "isPartition", "isBucket"))
+ expect_equal(collect(c)[[1]][[1]], "speed")
+ expect_error(listColumns("foo", "default"),
+ "Error in listColumns : analysis error - Table 'foo' does not exist in database 'default'")
+
+ f <- listFunctions()
+ expect_true(nrow(f) >= 200) # 250
+ expect_equal(colnames(f),
+ c("name", "database", "description", "className", "isTemporary"))
+ expect_equal(take(orderBy(f, "className"), 1)$className,
+ "org.apache.spark.sql.catalyst.expressions.Abs")
+ expect_error(listFunctions("foo_db"),
+ "Error in listFunctions : analysis error - Database 'foo_db' does not exist")
+
+ # recoverPartitions does not work with tempory view
+ expect_error(recoverPartitions("cars"),
+ "no such table - Table or view 'cars' not found in database 'default'")
+ expect_error(refreshTable("cars"), NA)
+ expect_error(refreshByPath("/"), NA)
+
+ dropTempView("cars")
+})
+
compare_list <- function(list1, list2) {
# get testthat to show the diff by first making the 2 lists equal in length
expect_equal(length(list1), length(list2))