#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# catalog.R: SparkSession catalog functions
#' Create an external table
#'
#' Creates an external table based on the dataset in a data source,
#' Returns a SparkDataFrame associated with the external table.
#'
#' The data source is specified by the \code{source} and a set of options(...).
#' If \code{source} is not specified, the default data source configured by
#' "spark.sql.sources.default" will be used.
#'
#' @param tableName a name of the table.
#' @param path the path of files to load.
#' @param source the name of external data source.
#' @param schema the schema of the data for certain data source.
#' @param ... additional argument(s) passed to the method.
#' @return A SparkDataFrame.
#' @rdname createExternalTable
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' df <- createExternalTable("myjson", path="path/to/json", source="json", schema)
#' }
#' @name createExternalTable
#' @method createExternalTable default
#' @note createExternalTable since 1.4.0
createExternalTable.default <- function(tableName, path = NULL, source = NULL, schema = NULL, ...) {
sparkSession <- getSparkSession()
options <- varargsToStrEnv(...)
if (!is.null(path)) {
options[["path"]] <- path
}
catalog <- callJMethod(sparkSession, "catalog")
if (is.null(schema)) {
sdf <- callJMethod(catalog, "createExternalTable", tableName, source, options)
} else {
sdf <- callJMethod(catalog, "createExternalTable", tableName, source, schema$jobj, options)
}
dataFrame(sdf)
}
createExternalTable <- function(x, ...) {
dispatchFunc("createExternalTable(tableName, path = NULL, source = NULL, ...)", x, ...)
}
#' Cache Table
#'
#' Caches the specified table in-memory.
#'
#' @param tableName The name of the table being cached
#' @return SparkDataFrame
#' @rdname cacheTable
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' path <- "path/to/file.json"
#' df <- read.json(path)
#' createOrReplaceTempView(df, "table")
#' cacheTable("table")
#' }
#' @name cacheTable
#' @method cacheTable default
#' @note cacheTable since 1.4.0
cacheTable.default <- function(tableName) {
sparkSession <- getSparkSession()
catalog <- callJMethod(sparkSession, "catalog")
invisible(handledCallJMethod(catalog, "cacheTable", tableName))
}
cacheTable <- function(x, ...) {
dispatchFunc("cacheTable(tableName)", x, ...)
}
#' Uncache Table
#'
#' Removes the specified table from the in-memory cache.
#'
#' @param tableName The name of the table being uncached
#' @return SparkDataFrame
#' @rdname uncacheTable
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' path <- "path/to/file.json"
#' df <- read.json(path)
#' createOrReplaceTempView(df, "table")
#' uncacheTable("table")
#' }
#' @name uncacheTable
#' @method uncacheTable default
#' @note uncacheTable since 1.4.0
uncacheTable.default <- function(tableName) {
sparkSession <- getSparkSession()
catalog <- callJMethod(sparkSession, "catalog")
invisible(handledCallJMethod(catalog, "uncacheTable", tableName))
}
uncacheTable <- function(x, ...) {
dispatchFunc("uncacheTable(tableName)", x, ...)
}
#' Clear Cache
#'
#' Removes all cached tables from the in-memory cache.
#'
#' @rdname clearCache
#' @export
#' @examples
#' \dontrun{
#' clearCache()
#' }
#' @name clearCache
#' @method clearCache default
#' @note clearCache since 1.4.0
clearCache.default <- function() {
sparkSession <- getSparkSession()
catalog <- callJMethod(sparkSession, "catalog")
invisible(callJMethod(catalog, "clearCache"))
}
clearCache <- function() {
dispatchFunc("clearCache()")
}
#' (Deprecated) Drop Temporary Table
#'
#' Drops the temporary table with the given table name in the catalog.
#' If the table has been cached/persisted before, it's also unpersisted.
#'
#' @param tableName The name of the SparkSQL table to be dropped.
#' @seealso \link{dropTempView}
#' @rdname dropTempTable-deprecated
#' @export
#' @examples
#' \dontrun{
#' sparkR.session()
#' df <- read.df(path, "parquet")
#' createOrReplaceTempView(df, "table")
#' dropTempTable("table")
#' }
#' @name dropTempTable
#' @method dropTempTable default
#' @note dropTempTable since 1.4.0
dropTempTable.default <- function(tableName) {
if (class(tableName) != "character") {
stop("tableName must be a string.")
}
dropTempView(tableName)
}
dropTempTable <- function(x, ...) {
.Deprecated("dropTempView")
dispatchFunc("dropTempView(viewName)", x, ...)
}
#' Drops the temporary view with the given view name in the catalog.
#'
#' Drops the temporary view with the given view name in the catalog.
#' If the view has been cached before, then it will also be uncached.
#'
#' @param viewName the name of the view to be dropped.
#' @return TRUE if the view is dropped successfully, FALSE otherwise.
#' @rdname dropTempView
#' @name dropTempView
#' @export
#' @examples
#' \dontrun{
#' sparkR.session()
#' df <- read.df(path, "parquet")
#' createOrReplaceTempView(df, "table")
#' dropTempView("table")
#' }
#' @note since 2.0.0
dropTempView <- function(viewName) {
sparkSession <- getSparkSession()
if (class(viewName) != "character") {
stop("viewName must be a string.")
}
catalog <- callJMethod(sparkSession, "catalog")
callJMethod(catalog, "dropTempView", viewName)
}
#' Tables
#'
#' Returns a SparkDataFrame containing names of tables in the given database.
#'
#' @param databaseName (optional) name of the database
#' @return a SparkDataFrame
#' @rdname tables
#' @seealso \link{listTables}
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' tables("hive")
#' }
#' @name tables
#' @method tables default
#' @note tables since 1.4.0
tables.default <- function(databaseName = NULL) {
# rename column to match previous output schema
withColumnRenamed(listTables(databaseName), "name", "tableName")
}
tables <- function(x, ...) {
dispatchFunc("tables(databaseName = NULL)", x, ...)
}
#' Table Names
#'
#' Returns the names of tables in the given database as an array.
#'
#' @param databaseName (optional) name of the database
#' @return a list of table names
#' @rdname tableNames
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' tableNames("hive")
#' }
#' @name tableNames
#' @method tableNames default
#' @note tableNames since 1.4.0
tableNames.default <- function(databaseName = NULL) {
sparkSession <- getSparkSession()
callJStatic("org.apache.spark.sql.api.r.SQLUtils",
"getTableNames",
sparkSession,
databaseName)
}
tableNames <- function(x, ...) {
dispatchFunc("tableNames(databaseName = NULL)", x, ...)
}
#' Returns the current default database
#'
#' Returns the current default database.
#'
#' @return name of the current default database.
#' @rdname currentDatabase
#' @name currentDatabase
#' @export
#' @examples
#' \dontrun{
#' sparkR.session()
#' currentDatabase()
#' }
#' @note since 2.2.0
currentDatabase <- function() {
sparkSession <- getSparkSession()
catalog <- callJMethod(sparkSession, "catalog")
callJMethod(catalog, "currentDatabase")
}
#' Sets the current default database
#'
#' Sets the current default database.
#'
#' @param databaseName name of the database
#' @rdname setCurrentDatabase
#' @name setCurrentDatabase
#' @export
#' @examples
#' \dontrun{
#' sparkR.session()
#' setCurrentDatabase("default")
#' }
#' @note since 2.2.0
setCurrentDatabase <- function(databaseName) {
sparkSession <- getSparkSession()
if (class(databaseName) != "character") {
stop("databaseName must be a string.")
}
catalog <- callJMethod(sparkSession, "catalog")
invisible(handledCallJMethod(catalog, "setCurrentDatabase", databaseName))
}
#' Returns a list of databases available
#'
#' Returns a list of databases available.
#'
#' @return a SparkDataFrame of the list of databases.
#' @rdname listDatabases
#' @name listDatabases
#' @export
#' @examples
#' \dontrun{
#' sparkR.session()
#' listDatabases()
#' }
#' @note since 2.2.0
listDatabases <- function() {
sparkSession <- getSparkSession()
catalog <- callJMethod(sparkSession, "catalog")
dataFrame(callJMethod(callJMethod(catalog, "listDatabases"), "toDF"))
}
#' Returns a list of tables in the specified database
#'
#' Returns a list of tables in the specified database.
#' This includes all temporary tables.
#'
#' @param databaseName (optional) name of the database
#' @return a SparkDataFrame of the list of tables.
#' @rdname listTables
#' @name listTables
#' @seealso \link{tables}
#' @export
#' @examples
#' \dontrun{
#' sparkR.session()
#' listTables()
#' listTables("default")
#' }
#' @note since 2.2.0
listTables <- function(databaseName = NULL) {
sparkSession <- getSparkSession()
if (!is.null(databaseName) && class(databaseName) != "character") {
stop("databaseName must be a string.")
}
catalog <- callJMethod(sparkSession, "catalog")
jdst <- if (is.null(databaseName)) {
callJMethod(catalog, "listTables")
} else {
handledCallJMethod(catalog, "listTables", databaseName)
}
dataFrame(callJMethod(jdst, "toDF"))
}
#' Returns a list of columns for the given table in the specified database
#'
#' Returns a list of columns for the given table in the specified database.
#'
#' @param tableName a name of the table.
#' @param databaseName (optional) name of the database
#' @return a SparkDataFrame of the list of column descriptions.
#' @rdname listColumns
#' @name listColumns
#' @export
#' @examples
#' \dontrun{
#' sparkR.session()
#' listColumns("mytable")
#' }
#' @note since 2.2.0
listColumns <- function(tableName, databaseName = NULL) {
sparkSession <- getSparkSession()
if (!is.null(databaseName) && class(databaseName) != "character") {
stop("databaseName must be a string.")
}
catalog <- callJMethod(sparkSession, "catalog")
jdst <- if (is.null(databaseName)) {
handledCallJMethod(catalog, "listColumns", tableName)
} else {
handledCallJMethod(catalog, "listColumns", databaseName, tableName)
}
dataFrame(callJMethod(jdst, "toDF"))
}
#' Returns a list of functions registered in the specified database
#'
#' Returns a list of functions registered in the specified database.
#' This includes all temporary functions.
#'
#' @param databaseName (optional) name of the database
#' @return a SparkDataFrame of the list of function descriptions.
#' @rdname listFunctions
#' @name listFunctions
#' @export
#' @examples
#' \dontrun{
#' sparkR.session()
#' listFunctions()
#' }
#' @note since 2.2.0
listFunctions <- function(databaseName = NULL) {
sparkSession <- getSparkSession()
if (!is.null(databaseName) && class(databaseName) != "character") {
stop("databaseName must be a string.")
}
catalog <- callJMethod(sparkSession, "catalog")
jdst <- if (is.null(databaseName)) {
callJMethod(catalog, "listFunctions")
} else {
handledCallJMethod(catalog, "listFunctions", databaseName)
}
dataFrame(callJMethod(jdst, "toDF"))
}
#' Recover all the partitions in the directory of a table and update the catalog
#'
#' Recover all the partitions in the directory of a table and update the catalog. The name should
#' reference a partitioned table, and not a temporary view.
#'
#' @param tableName a name of the table.
#' @rdname recoverPartitions
#' @name recoverPartitions
#' @export
#' @examples
#' \dontrun{
#' sparkR.session()
#' recoverPartitions("myTable")
#' }
#' @note since 2.2.0
recoverPartitions <- function(tableName) {
sparkSession <- getSparkSession()
catalog <- callJMethod(sparkSession, "catalog")
invisible(handledCallJMethod(catalog, "recoverPartitions", tableName))
}
#' Invalidate and refresh all the cached metadata of the given table
#'
#' Invalidate and refresh all the cached metadata of the given table. For performance reasons,
#' Spark SQL or the external data source library it uses might cache certain metadata about a
#' table, such as the location of blocks. When those change outside of Spark SQL, users should
#' call this function to invalidate the cache.
#'
#' If this table is cached as an InMemoryRelation, drop the original cached version and make the
#' new version cached lazily.
#'
#' @param tableName a name of the table.
#' @rdname refreshTable
#' @name refreshTable
#' @export
#' @examples
#' \dontrun{
#' sparkR.session()
#' refreshTable("myTable")
#' }
#' @note since 2.2.0
refreshTable <- function(tableName) {
sparkSession <- getSparkSession()
catalog <- callJMethod(sparkSession, "catalog")
invisible(handledCallJMethod(catalog, "refreshTable", tableName))
}
#' Invalidate and refresh all the cached data and metadata for SparkDataFrame containing path
#'
#' Invalidate and refresh all the cached data (and the associated metadata) for any SparkDataFrame
#' that contains the given data source path. Path matching is by prefix, i.e. "/" would invalidate
#' everything that is cached.
#'
#' @param path the path of the data source.
#' @rdname refreshByPath
#' @name refreshByPath
#' @export
#' @examples
#' \dontrun{
#' sparkR.session()
#' refreshByPath("/path")
#' }
#' @note since 2.2.0
refreshByPath <- function(path) {
sparkSession <- getSparkSession()
catalog <- callJMethod(sparkSession, "catalog")
invisible(handledCallJMethod(catalog, "refreshByPath", path))
}