aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala62
1 files changed, 39 insertions, 23 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 34803133f6..ad989a97e4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -39,7 +39,7 @@ abstract class ExternalCatalog {
protected def requireDbExists(db: String): Unit = {
if (!databaseExists(db)) {
- throw new AnalysisException(s"Database $db does not exist")
+ throw new AnalysisException(s"Database '$db' does not exist")
}
}
@@ -91,6 +91,8 @@ abstract class ExternalCatalog {
def getTable(db: String, table: String): CatalogTable
+ def getTableOption(db: String, table: String): Option[CatalogTable]
+
def tableExists(db: String, table: String): Boolean
def listTables(db: String): Seq[String]
@@ -150,17 +152,10 @@ abstract class ExternalCatalog {
def renameFunction(db: String, oldName: String, newName: String): Unit
- /**
- * Alter a function whose name that matches the one specified in `funcDefinition`,
- * assuming the function exists.
- *
- * Note: If the underlying implementation does not support altering a certain field,
- * this becomes a no-op.
- */
- def alterFunction(db: String, funcDefinition: CatalogFunction): Unit
-
def getFunction(db: String, funcName: String): CatalogFunction
+ def functionExists(db: String, funcName: String): Boolean
+
def listFunctions(db: String, pattern: String): Seq[String]
}
@@ -169,10 +164,15 @@ abstract class ExternalCatalog {
/**
* A function defined in the catalog.
*
- * @param name name of the function
+ * @param identifier name of the function
* @param className fully qualified class name, e.g. "org.apache.spark.util.MyFunc"
+ * @param resources resource types and Uris used by the function
*/
-case class CatalogFunction(name: FunctionIdentifier, className: String)
+// TODO: Use FunctionResource instead of (String, String) as the element type of resources.
+case class CatalogFunction(
+ identifier: FunctionIdentifier,
+ className: String,
+ resources: Seq[(String, String)])
/**
@@ -216,26 +216,42 @@ case class CatalogTablePartition(
* future once we have a better understanding of how we want to handle skewed columns.
*/
case class CatalogTable(
- name: TableIdentifier,
+ identifier: TableIdentifier,
tableType: CatalogTableType,
storage: CatalogStorageFormat,
schema: Seq[CatalogColumn],
- partitionColumns: Seq[CatalogColumn] = Seq.empty,
- sortColumns: Seq[CatalogColumn] = Seq.empty,
- numBuckets: Int = 0,
+ partitionColumnNames: Seq[String] = Seq.empty,
+ sortColumnNames: Seq[String] = Seq.empty,
+ bucketColumnNames: Seq[String] = Seq.empty,
+ numBuckets: Int = -1,
createTime: Long = System.currentTimeMillis,
- lastAccessTime: Long = System.currentTimeMillis,
+ lastAccessTime: Long = -1,
properties: Map[String, String] = Map.empty,
viewOriginalText: Option[String] = None,
- viewText: Option[String] = None) {
+ viewText: Option[String] = None,
+ comment: Option[String] = None) {
+
+ // Verify that the provided columns are part of the schema
+ private val colNames = schema.map(_.name).toSet
+ private def requireSubsetOfSchema(cols: Seq[String], colType: String): Unit = {
+ require(cols.toSet.subsetOf(colNames), s"$colType columns (${cols.mkString(", ")}) " +
+ s"must be a subset of schema (${colNames.mkString(", ")}) in table '$identifier'")
+ }
+ requireSubsetOfSchema(partitionColumnNames, "partition")
+ requireSubsetOfSchema(sortColumnNames, "sort")
+ requireSubsetOfSchema(bucketColumnNames, "bucket")
+
+ /** Columns this table is partitioned by. */
+ def partitionColumns: Seq[CatalogColumn] =
+ schema.filter { c => partitionColumnNames.contains(c.name) }
/** Return the database this table was specified to belong to, assuming it exists. */
- def database: String = name.database.getOrElse {
- throw new AnalysisException(s"table $name did not specify database")
+ def database: String = identifier.database.getOrElse {
+ throw new AnalysisException(s"table $identifier did not specify database")
}
/** Return the fully qualified name of this table, assuming the database was specified. */
- def qualifiedName: String = name.unquotedString
+ def qualifiedName: String = identifier.unquotedString
/** Syntactic sugar to update a field in `storage`. */
def withNewStorage(
@@ -290,6 +306,6 @@ case class CatalogRelation(
// TODO: implement this
override def output: Seq[Attribute] = Seq.empty
- require(metadata.name.database == Some(db),
- "provided database does not much the one specified in the table definition")
+ require(metadata.identifier.database == Some(db),
+ "provided database does not match the one specified in the table definition")
}