diff options
Diffstat (limited to 'sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala')
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala | 62 |
1 files changed, 39 insertions, 23 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 34803133f6..ad989a97e4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -39,7 +39,7 @@ abstract class ExternalCatalog { protected def requireDbExists(db: String): Unit = { if (!databaseExists(db)) { - throw new AnalysisException(s"Database $db does not exist") + throw new AnalysisException(s"Database '$db' does not exist") } } @@ -91,6 +91,8 @@ abstract class ExternalCatalog { def getTable(db: String, table: String): CatalogTable + def getTableOption(db: String, table: String): Option[CatalogTable] + def tableExists(db: String, table: String): Boolean def listTables(db: String): Seq[String] @@ -150,17 +152,10 @@ abstract class ExternalCatalog { def renameFunction(db: String, oldName: String, newName: String): Unit - /** - * Alter a function whose name that matches the one specified in `funcDefinition`, - * assuming the function exists. - * - * Note: If the underlying implementation does not support altering a certain field, - * this becomes a no-op. - */ - def alterFunction(db: String, funcDefinition: CatalogFunction): Unit - def getFunction(db: String, funcName: String): CatalogFunction + def functionExists(db: String, funcName: String): Boolean + def listFunctions(db: String, pattern: String): Seq[String] } @@ -169,10 +164,15 @@ abstract class ExternalCatalog { /** * A function defined in the catalog. * - * @param name name of the function + * @param identifier name of the function * @param className fully qualified class name, e.g. "org.apache.spark.util.MyFunc" + * @param resources resource types and Uris used by the function */ -case class CatalogFunction(name: FunctionIdentifier, className: String) +// TODO: Use FunctionResource instead of (String, String) as the element type of resources. +case class CatalogFunction( + identifier: FunctionIdentifier, + className: String, + resources: Seq[(String, String)]) /** @@ -216,26 +216,42 @@ case class CatalogTablePartition( * future once we have a better understanding of how we want to handle skewed columns. */ case class CatalogTable( - name: TableIdentifier, + identifier: TableIdentifier, tableType: CatalogTableType, storage: CatalogStorageFormat, schema: Seq[CatalogColumn], - partitionColumns: Seq[CatalogColumn] = Seq.empty, - sortColumns: Seq[CatalogColumn] = Seq.empty, - numBuckets: Int = 0, + partitionColumnNames: Seq[String] = Seq.empty, + sortColumnNames: Seq[String] = Seq.empty, + bucketColumnNames: Seq[String] = Seq.empty, + numBuckets: Int = -1, createTime: Long = System.currentTimeMillis, - lastAccessTime: Long = System.currentTimeMillis, + lastAccessTime: Long = -1, properties: Map[String, String] = Map.empty, viewOriginalText: Option[String] = None, - viewText: Option[String] = None) { + viewText: Option[String] = None, + comment: Option[String] = None) { + + // Verify that the provided columns are part of the schema + private val colNames = schema.map(_.name).toSet + private def requireSubsetOfSchema(cols: Seq[String], colType: String): Unit = { + require(cols.toSet.subsetOf(colNames), s"$colType columns (${cols.mkString(", ")}) " + + s"must be a subset of schema (${colNames.mkString(", ")}) in table '$identifier'") + } + requireSubsetOfSchema(partitionColumnNames, "partition") + requireSubsetOfSchema(sortColumnNames, "sort") + requireSubsetOfSchema(bucketColumnNames, "bucket") + + /** Columns this table is partitioned by. */ + def partitionColumns: Seq[CatalogColumn] = + schema.filter { c => partitionColumnNames.contains(c.name) } /** Return the database this table was specified to belong to, assuming it exists. */ - def database: String = name.database.getOrElse { - throw new AnalysisException(s"table $name did not specify database") + def database: String = identifier.database.getOrElse { + throw new AnalysisException(s"table $identifier did not specify database") } /** Return the fully qualified name of this table, assuming the database was specified. */ - def qualifiedName: String = name.unquotedString + def qualifiedName: String = identifier.unquotedString /** Syntactic sugar to update a field in `storage`. */ def withNewStorage( @@ -290,6 +306,6 @@ case class CatalogRelation( // TODO: implement this override def output: Seq[Attribute] = Seq.empty - require(metadata.name.database == Some(db), - "provided database does not much the one specified in the table definition") + require(metadata.identifier.database == Some(db), + "provided database does not match the one specified in the table definition") } |