aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHerman van Hovell <hvanhovell@databricks.com>2016-09-29 17:56:32 -0700
committerReynold Xin <rxin@databricks.com>2016-09-29 17:56:32 -0700
commit74ac1c43817c0b8da70342e540ec7638dd7d01bd (patch)
tree2b804a8fb24074efeaa8537292ce98136eaa8b95
parent2f739567080d804a942cfcca0e22f91ab7cbea36 (diff)
downloadspark-74ac1c43817c0b8da70342e540ec7638dd7d01bd.tar.gz
spark-74ac1c43817c0b8da70342e540ec7638dd7d01bd.tar.bz2
spark-74ac1c43817c0b8da70342e540ec7638dd7d01bd.zip
[SPARK-17717][SQL] Add exist/find methods to Catalog.
## What changes were proposed in this pull request? The current user facing catalog does not implement methods for checking object existence or finding objects. You could theoretically do this using the `list*` commands, but this is rather cumbersome and can actually be costly when there are many objects. This PR adds `exists*` and `find*` methods for Databases, Table and Functions. ## How was this patch tested? Added tests to `org.apache.spark.sql.internal.CatalogSuite` Author: Herman van Hovell <hvanhovell@databricks.com> Closes #15301 from hvanhovell/SPARK-17717.
-rw-r--r--project/MimaExcludes.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala83
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala152
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala118
4 files changed, 339 insertions, 25 deletions
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 4db3edb733..2ffe0ac9bc 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -46,7 +46,16 @@ object MimaExcludes {
// [SPARK-16967] Move Mesos to Module
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkMasterRegex.MESOS_REGEX"),
// [SPARK-16240] ML persistence backward compatibility for LDA
- ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.clustering.LDA$")
+ ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.clustering.LDA$"),
+ // [SPARK-17717] Add Find and Exists method to Catalog.
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.findDatabase"),
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.findTable"),
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.findFunction"),
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.findColumn"),
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.databaseExists"),
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.tableExists"),
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.functionExists"),
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.columnExists")
)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
index 1aed245fdd..b439022d22 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -102,6 +102,89 @@ abstract class Catalog {
def listColumns(dbName: String, tableName: String): Dataset[Column]
/**
+ * Find the database with the specified name. This throws an AnalysisException when the database
+ * cannot be found.
+ *
+ * @since 2.1.0
+ */
+ @throws[AnalysisException]("database does not exist")
+ def findDatabase(dbName: String): Database
+
+ /**
+ * Find the table with the specified name. This table can be a temporary table or a table in the
+ * current database. This throws an AnalysisException when the table cannot be found.
+ *
+ * @since 2.1.0
+ */
+ @throws[AnalysisException]("table does not exist")
+ def findTable(tableName: String): Table
+
+ /**
+ * Find the table with the specified name in the specified database. This throws an
+ * AnalysisException when the table cannot be found.
+ *
+ * @since 2.1.0
+ */
+ @throws[AnalysisException]("database or table does not exist")
+ def findTable(dbName: String, tableName: String): Table
+
+ /**
+ * Find the function with the specified name. This function can be a temporary function or a
+ * function in the current database. This throws an AnalysisException when the function cannot
+ * be found.
+ *
+ * @since 2.1.0
+ */
+ @throws[AnalysisException]("function does not exist")
+ def findFunction(functionName: String): Function
+
+ /**
+ * Find the function with the specified name. This throws an AnalysisException when the function
+ * cannot be found.
+ *
+ * @since 2.1.0
+ */
+ @throws[AnalysisException]("database or function does not exist")
+ def findFunction(dbName: String, functionName: String): Function
+
+ /**
+ * Check if the database with the specified name exists.
+ *
+ * @since 2.1.0
+ */
+ def databaseExists(dbName: String): Boolean
+
+ /**
+ * Check if the table with the specified name exists. This can either be a temporary table or a
+ * table in the current database.
+ *
+ * @since 2.1.0
+ */
+ def tableExists(tableName: String): Boolean
+
+ /**
+ * Check if the table with the specified name exists in the specified database.
+ *
+ * @since 2.1.0
+ */
+ def tableExists(dbName: String, tableName: String): Boolean
+
+ /**
+ * Check if the function with the specified name exists. This can either be a temporary function
+ * or a function in the current database.
+ *
+ * @since 2.1.0
+ */
+ def functionExists(functionName: String): Boolean
+
+ /**
+ * Check if the function with the specified name exists in the specified database.
+ *
+ * @since 2.1.0
+ */
+ def functionExists(dbName: String, functionName: String): Boolean
+
+ /**
* :: Experimental ::
* Creates an external table from the given path and returns the corresponding DataFrame.
* It will use the default data source configured by spark.sql.sources.default.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index f252535765..a1087edd03 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -23,10 +23,10 @@ import scala.reflect.runtime.universe.TypeTag
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql._
import org.apache.spark.sql.catalog.{Catalog, Column, Database, Function, Table}
-import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, TableIdentifier}
-import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SessionCatalog}
+import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.execution.datasources.CreateTable
import org.apache.spark.sql.types.StructType
@@ -69,15 +69,18 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
*/
override def listDatabases(): Dataset[Database] = {
val databases = sessionCatalog.listDatabases().map { dbName =>
- val metadata = sessionCatalog.getDatabaseMetadata(dbName)
- new Database(
- name = metadata.name,
- description = metadata.description,
- locationUri = metadata.locationUri)
+ makeDatabase(sessionCatalog.getDatabaseMetadata(dbName))
}
CatalogImpl.makeDataset(databases, sparkSession)
}
+ private def makeDatabase(metadata: CatalogDatabase): Database = {
+ new Database(
+ name = metadata.name,
+ description = metadata.description,
+ locationUri = metadata.locationUri)
+ }
+
/**
* Returns a list of tables in the current database.
* This includes all temporary tables.
@@ -94,18 +97,21 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
override def listTables(dbName: String): Dataset[Table] = {
requireDatabaseExists(dbName)
val tables = sessionCatalog.listTables(dbName).map { tableIdent =>
- val isTemp = tableIdent.database.isEmpty
- val metadata = if (isTemp) None else Some(sessionCatalog.getTableMetadata(tableIdent))
- new Table(
- name = tableIdent.identifier,
- database = metadata.flatMap(_.identifier.database).orNull,
- description = metadata.flatMap(_.comment).orNull,
- tableType = metadata.map(_.tableType.name).getOrElse("TEMPORARY"),
- isTemporary = isTemp)
+ makeTable(tableIdent, tableIdent.database.isEmpty)
}
CatalogImpl.makeDataset(tables, sparkSession)
}
+ private def makeTable(tableIdent: TableIdentifier, isTemp: Boolean): Table = {
+ val metadata = if (isTemp) None else Some(sessionCatalog.getTableMetadata(tableIdent))
+ new Table(
+ name = tableIdent.identifier,
+ database = metadata.flatMap(_.identifier.database).orNull,
+ description = metadata.flatMap(_.comment).orNull,
+ tableType = metadata.map(_.tableType.name).getOrElse("TEMPORARY"),
+ isTemporary = isTemp)
+ }
+
/**
* Returns a list of functions registered in the current database.
* This includes all temporary functions
@@ -121,18 +127,22 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
@throws[AnalysisException]("database does not exist")
override def listFunctions(dbName: String): Dataset[Function] = {
requireDatabaseExists(dbName)
- val functions = sessionCatalog.listFunctions(dbName).map { case (funcIdent, _) =>
- val metadata = sessionCatalog.lookupFunctionInfo(funcIdent)
- new Function(
- name = funcIdent.identifier,
- database = funcIdent.database.orNull,
- description = null, // for now, this is always undefined
- className = metadata.getClassName,
- isTemporary = funcIdent.database.isEmpty)
+ val functions = sessionCatalog.listFunctions(dbName).map { case (functIdent, _) =>
+ makeFunction(functIdent)
}
CatalogImpl.makeDataset(functions, sparkSession)
}
+ private def makeFunction(funcIdent: FunctionIdentifier): Function = {
+ val metadata = sessionCatalog.lookupFunctionInfo(funcIdent)
+ new Function(
+ name = funcIdent.identifier,
+ database = funcIdent.database.orNull,
+ description = null, // for now, this is always undefined
+ className = metadata.getClassName,
+ isTemporary = funcIdent.database.isEmpty)
+ }
+
/**
* Returns a list of columns for the given table in the current database.
*/
@@ -168,6 +178,100 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
}
/**
+ * Find the database with the specified name. This throws an [[AnalysisException]] when no
+ * [[Database]] can be found.
+ */
+ override def findDatabase(dbName: String): Database = {
+ if (sessionCatalog.databaseExists(dbName)) {
+ makeDatabase(sessionCatalog.getDatabaseMetadata(dbName))
+ } else {
+ throw new AnalysisException(s"The specified database $dbName does not exist.")
+ }
+ }
+
+ /**
+ * Find the table with the specified name. This table can be a temporary table or a table in the
+ * current database. This throws an [[AnalysisException]] when no [[Table]] can be found.
+ */
+ override def findTable(tableName: String): Table = {
+ findTable(null, tableName)
+ }
+
+ /**
+ * Find the table with the specified name in the specified database. This throws an
+ * [[AnalysisException]] when no [[Table]] can be found.
+ */
+ override def findTable(dbName: String, tableName: String): Table = {
+ val tableIdent = TableIdentifier(tableName, Option(dbName))
+ val isTemporary = sessionCatalog.isTemporaryTable(tableIdent)
+ if (isTemporary || sessionCatalog.tableExists(tableIdent)) {
+ makeTable(tableIdent, isTemporary)
+ } else {
+ throw new AnalysisException(s"The specified table $tableIdent does not exist.")
+ }
+ }
+
+ /**
+ * Find the function with the specified name. This function can be a temporary function or a
+ * function in the current database. This throws an [[AnalysisException]] when no [[Function]]
+ * can be found.
+ */
+ override def findFunction(functionName: String): Function = {
+ findFunction(null, functionName)
+ }
+
+ /**
+ * Find the function with the specified name. This returns [[None]] when no [[Function]] can be
+ * found.
+ */
+ override def findFunction(dbName: String, functionName: String): Function = {
+ val functionIdent = FunctionIdentifier(functionName, Option(dbName))
+ if (sessionCatalog.functionExists(functionIdent)) {
+ makeFunction(functionIdent)
+ } else {
+ throw new AnalysisException(s"The specified function $functionIdent does not exist.")
+ }
+ }
+
+ /**
+ * Check if the database with the specified name exists.
+ */
+ override def databaseExists(dbName: String): Boolean = {
+ sessionCatalog.databaseExists(dbName)
+ }
+
+ /**
+ * Check if the table with the specified name exists. This can either be a temporary table or a
+ * table in the current database.
+ */
+ override def tableExists(tableName: String): Boolean = {
+ tableExists(null, tableName)
+ }
+
+ /**
+ * Check if the table with the specified name exists in the specified database.
+ */
+ override def tableExists(dbName: String, tableName: String): Boolean = {
+ val tableIdent = TableIdentifier(tableName, Option(dbName))
+ sessionCatalog.isTemporaryTable(tableIdent) || sessionCatalog.tableExists(tableIdent)
+ }
+
+ /**
+ * Check if the function with the specified name exists. This can either be a temporary function
+ * or a function in the current database.
+ */
+ override def functionExists(functionName: String): Boolean = {
+ functionExists(null, functionName)
+ }
+
+ /**
+ * Check if the function with the specified name exists in the specified database.
+ */
+ override def functionExists(dbName: String, functionName: String): Boolean = {
+ sessionCatalog.functionExists(FunctionIdentifier(functionName, Option(dbName)))
+ }
+
+ /**
* :: Experimental ::
* Creates an external table from the given path and returns the corresponding DataFrame.
* It will use the default data source configured by spark.sql.sources.default.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index 3dc67ffafb..783bf77f86 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -340,6 +340,124 @@ class CatalogSuite
}
}
+ test("find database") {
+ intercept[AnalysisException](spark.catalog.findDatabase("db10"))
+ withTempDatabase { db =>
+ assert(spark.catalog.findDatabase(db).name === db)
+ }
+ }
+
+ test("find table") {
+ withTempDatabase { db =>
+ withTable(s"tbl_x", s"$db.tbl_y") {
+ // Try to find non existing tables.
+ intercept[AnalysisException](spark.catalog.findTable("tbl_x"))
+ intercept[AnalysisException](spark.catalog.findTable("tbl_y"))
+ intercept[AnalysisException](spark.catalog.findTable(db, "tbl_y"))
+
+ // Create objects.
+ createTempTable("tbl_x")
+ createTable("tbl_y", Some(db))
+
+ // Find a temporary table
+ assert(spark.catalog.findTable("tbl_x").name === "tbl_x")
+
+ // Find a qualified table
+ assert(spark.catalog.findTable(db, "tbl_y").name === "tbl_y")
+
+ // Find an unqualified table using the current database
+ intercept[AnalysisException](spark.catalog.findTable("tbl_y"))
+ spark.catalog.setCurrentDatabase(db)
+ assert(spark.catalog.findTable("tbl_y").name === "tbl_y")
+ }
+ }
+ }
+
+ test("find function") {
+ withTempDatabase { db =>
+ withUserDefinedFunction("fn1" -> true, s"$db.fn2" -> false) {
+ // Try to find non existing functions.
+ intercept[AnalysisException](spark.catalog.findFunction("fn1"))
+ intercept[AnalysisException](spark.catalog.findFunction("fn2"))
+ intercept[AnalysisException](spark.catalog.findFunction(db, "fn2"))
+
+ // Create objects.
+ createTempFunction("fn1")
+ createFunction("fn2", Some(db))
+
+ // Find a temporary function
+ assert(spark.catalog.findFunction("fn1").name === "fn1")
+
+ // Find a qualified function
+ assert(spark.catalog.findFunction(db, "fn2").name === "fn2")
+
+ // Find an unqualified function using the current database
+ intercept[AnalysisException](spark.catalog.findFunction("fn2"))
+ spark.catalog.setCurrentDatabase(db)
+ assert(spark.catalog.findFunction("fn2").name === "fn2")
+ }
+ }
+ }
+
+ test("database exists") {
+ assert(!spark.catalog.databaseExists("db10"))
+ createDatabase("db10")
+ assert(spark.catalog.databaseExists("db10"))
+ dropDatabase("db10")
+ }
+
+ test("table exists") {
+ withTempDatabase { db =>
+ withTable(s"tbl_x", s"$db.tbl_y") {
+ // Try to find non existing tables.
+ assert(!spark.catalog.tableExists("tbl_x"))
+ assert(!spark.catalog.tableExists("tbl_y"))
+ assert(!spark.catalog.tableExists(db, "tbl_y"))
+
+ // Create objects.
+ createTempTable("tbl_x")
+ createTable("tbl_y", Some(db))
+
+ // Find a temporary table
+ assert(spark.catalog.tableExists("tbl_x"))
+
+ // Find a qualified table
+ assert(spark.catalog.tableExists(db, "tbl_y"))
+
+ // Find an unqualified table using the current database
+ assert(!spark.catalog.tableExists("tbl_y"))
+ spark.catalog.setCurrentDatabase(db)
+ assert(spark.catalog.tableExists("tbl_y"))
+ }
+ }
+ }
+
+ test("function exists") {
+ withTempDatabase { db =>
+ withUserDefinedFunction("fn1" -> true, s"$db.fn2" -> false) {
+ // Try to find non existing functions.
+ assert(!spark.catalog.functionExists("fn1"))
+ assert(!spark.catalog.functionExists("fn2"))
+ assert(!spark.catalog.functionExists(db, "fn2"))
+
+ // Create objects.
+ createTempFunction("fn1")
+ createFunction("fn2", Some(db))
+
+ // Find a temporary function
+ assert(spark.catalog.functionExists("fn1"))
+
+ // Find a qualified function
+ assert(spark.catalog.functionExists(db, "fn2"))
+
+ // Find an unqualified function using the current database
+ assert(!spark.catalog.functionExists("fn2"))
+ spark.catalog.setCurrentDatabase(db)
+ assert(spark.catalog.functionExists("fn2"))
+ }
+ }
+ }
+
// TODO: add tests for the rest of them
}