aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-02-03 19:32:41 -0800
committerReynold Xin <rxin@databricks.com>2016-02-03 19:32:41 -0800
commita64831124c215f56f124747fa241560c70cf0a36 (patch)
tree7765889ae9ff1f388ea559541170b45406cd4203
parenta8e2ba776b20c8054918af646d8228bba1b87c9b (diff)
downloadspark-a64831124c215f56f124747fa241560c70cf0a36.tar.gz
spark-a64831124c215f56f124747fa241560c70cf0a36.tar.bz2
spark-a64831124c215f56f124747fa241560c70cf0a36.zip
[SPARK-13079][SQL] Extend and implement InMemoryCatalog
This is a step towards consolidating `SQLContext` and `HiveContext`. This patch extends the existing Catalog API added in #10982 to include methods for handling table partitions. In particular, a partition is identified by `PartitionSpec`, which is just a `Map[String, String]`. The Catalog is still not used by anything yet, but its API is now more or less complete and an implementation is fully tested. About 200 lines are test code. Author: Andrew Or <andrew@databricks.com> Closes #11069 from andrewor14/catalog.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala129
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala40
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala206
3 files changed, 328 insertions, 47 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 9e6dfb7e95..38be61c52a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -28,9 +28,10 @@ import org.apache.spark.sql.AnalysisException
* All public methods should be synchronized for thread-safety.
*/
class InMemoryCatalog extends Catalog {
+ import Catalog._
private class TableDesc(var table: Table) {
- val partitions = new mutable.HashMap[String, TablePartition]
+ val partitions = new mutable.HashMap[PartitionSpec, TablePartition]
}
private class DatabaseDesc(var db: Database) {
@@ -46,13 +47,20 @@ class InMemoryCatalog extends Catalog {
}
private def existsFunction(db: String, funcName: String): Boolean = {
+ assertDbExists(db)
catalog(db).functions.contains(funcName)
}
private def existsTable(db: String, table: String): Boolean = {
+ assertDbExists(db)
catalog(db).tables.contains(table)
}
+ private def existsPartition(db: String, table: String, spec: PartitionSpec): Boolean = {
+ assertTableExists(db, table)
+ catalog(db).tables(table).partitions.contains(spec)
+ }
+
private def assertDbExists(db: String): Unit = {
if (!catalog.contains(db)) {
throw new AnalysisException(s"Database $db does not exist")
@@ -60,16 +68,20 @@ class InMemoryCatalog extends Catalog {
}
private def assertFunctionExists(db: String, funcName: String): Unit = {
- assertDbExists(db)
if (!existsFunction(db, funcName)) {
- throw new AnalysisException(s"Function $funcName does not exists in $db database")
+ throw new AnalysisException(s"Function $funcName does not exist in $db database")
}
}
private def assertTableExists(db: String, table: String): Unit = {
- assertDbExists(db)
if (!existsTable(db, table)) {
- throw new AnalysisException(s"Table $table does not exists in $db database")
+ throw new AnalysisException(s"Table $table does not exist in $db database")
+ }
+ }
+
+ private def assertPartitionExists(db: String, table: String, spec: PartitionSpec): Unit = {
+ if (!existsPartition(db, table, spec)) {
+ throw new AnalysisException(s"Partition does not exist in database $db table $table: $spec")
}
}
@@ -77,9 +89,11 @@ class InMemoryCatalog extends Catalog {
// Databases
// --------------------------------------------------------------------------
- override def createDatabase(dbDefinition: Database, ifNotExists: Boolean): Unit = synchronized {
+ override def createDatabase(
+ dbDefinition: Database,
+ ignoreIfExists: Boolean): Unit = synchronized {
if (catalog.contains(dbDefinition.name)) {
- if (!ifNotExists) {
+ if (!ignoreIfExists) {
throw new AnalysisException(s"Database ${dbDefinition.name} already exists.")
}
} else {
@@ -88,9 +102,9 @@ class InMemoryCatalog extends Catalog {
}
override def dropDatabase(
- db: String,
- ignoreIfNotExists: Boolean,
- cascade: Boolean): Unit = synchronized {
+ db: String,
+ ignoreIfNotExists: Boolean,
+ cascade: Boolean): Unit = synchronized {
if (catalog.contains(db)) {
if (!cascade) {
// If cascade is false, make sure the database is empty.
@@ -133,11 +147,13 @@ class InMemoryCatalog extends Catalog {
// Tables
// --------------------------------------------------------------------------
- override def createTable(db: String, tableDefinition: Table, ifNotExists: Boolean)
- : Unit = synchronized {
+ override def createTable(
+ db: String,
+ tableDefinition: Table,
+ ignoreIfExists: Boolean): Unit = synchronized {
assertDbExists(db)
if (existsTable(db, tableDefinition.name)) {
- if (!ifNotExists) {
+ if (!ignoreIfExists) {
throw new AnalysisException(s"Table ${tableDefinition.name} already exists in $db database")
}
} else {
@@ -145,8 +161,10 @@ class InMemoryCatalog extends Catalog {
}
}
- override def dropTable(db: String, table: String, ignoreIfNotExists: Boolean)
- : Unit = synchronized {
+ override def dropTable(
+ db: String,
+ table: String,
+ ignoreIfNotExists: Boolean): Unit = synchronized {
assertDbExists(db)
if (existsTable(db, table)) {
catalog(db).tables.remove(table)
@@ -190,14 +208,67 @@ class InMemoryCatalog extends Catalog {
// Partitions
// --------------------------------------------------------------------------
- override def alterPartition(db: String, table: String, part: TablePartition)
- : Unit = synchronized {
- throw new UnsupportedOperationException
+ override def createPartitions(
+ db: String,
+ table: String,
+ parts: Seq[TablePartition],
+ ignoreIfExists: Boolean): Unit = synchronized {
+ assertTableExists(db, table)
+ val existingParts = catalog(db).tables(table).partitions
+ if (!ignoreIfExists) {
+ val dupSpecs = parts.collect { case p if existingParts.contains(p.spec) => p.spec }
+ if (dupSpecs.nonEmpty) {
+ val dupSpecsStr = dupSpecs.mkString("\n===\n")
+ throw new AnalysisException(
+ s"The following partitions already exist in database $db table $table:\n$dupSpecsStr")
+ }
+ }
+ parts.foreach { p => existingParts.put(p.spec, p) }
+ }
+
+ override def dropPartitions(
+ db: String,
+ table: String,
+ partSpecs: Seq[PartitionSpec],
+ ignoreIfNotExists: Boolean): Unit = synchronized {
+ assertTableExists(db, table)
+ val existingParts = catalog(db).tables(table).partitions
+ if (!ignoreIfNotExists) {
+ val missingSpecs = partSpecs.collect { case s if !existingParts.contains(s) => s }
+ if (missingSpecs.nonEmpty) {
+ val missingSpecsStr = missingSpecs.mkString("\n===\n")
+ throw new AnalysisException(
+ s"The following partitions do not exist in database $db table $table:\n$missingSpecsStr")
+ }
+ }
+ partSpecs.foreach(existingParts.remove)
}
- override def alterPartitions(db: String, table: String, parts: Seq[TablePartition])
- : Unit = synchronized {
- throw new UnsupportedOperationException
+ override def alterPartition(
+ db: String,
+ table: String,
+ spec: Map[String, String],
+ newPart: TablePartition): Unit = synchronized {
+ assertPartitionExists(db, table, spec)
+ val existingParts = catalog(db).tables(table).partitions
+ if (spec != newPart.spec) {
+ // Also a change in specs; remove the old one and add the new one back
+ existingParts.remove(spec)
+ }
+ existingParts.put(newPart.spec, newPart)
+ }
+
+ override def getPartition(
+ db: String,
+ table: String,
+ spec: Map[String, String]): TablePartition = synchronized {
+ assertPartitionExists(db, table, spec)
+ catalog(db).tables(table).partitions(spec)
+ }
+
+ override def listPartitions(db: String, table: String): Seq[TablePartition] = synchronized {
+ assertTableExists(db, table)
+ catalog(db).tables(table).partitions.values.toSeq
}
// --------------------------------------------------------------------------
@@ -205,11 +276,12 @@ class InMemoryCatalog extends Catalog {
// --------------------------------------------------------------------------
override def createFunction(
- db: String, func: Function, ifNotExists: Boolean): Unit = synchronized {
+ db: String,
+ func: Function,
+ ignoreIfExists: Boolean): Unit = synchronized {
assertDbExists(db)
-
if (existsFunction(db, func.name)) {
- if (!ifNotExists) {
+ if (!ignoreIfExists) {
throw new AnalysisException(s"Function $func already exists in $db database")
}
} else {
@@ -222,14 +294,16 @@ class InMemoryCatalog extends Catalog {
catalog(db).functions.remove(funcName)
}
- override def alterFunction(db: String, funcName: String, funcDefinition: Function)
- : Unit = synchronized {
+ override def alterFunction(
+ db: String,
+ funcName: String,
+ funcDefinition: Function): Unit = synchronized {
assertFunctionExists(db, funcName)
if (funcName != funcDefinition.name) {
// Also a rename; remove the old one and add the new one back
catalog(db).functions.remove(funcName)
}
- catalog(db).functions.put(funcName, funcDefinition)
+ catalog(db).functions.put(funcDefinition.name, funcDefinition)
}
override def getFunction(db: String, funcName: String): Function = synchronized {
@@ -239,7 +313,6 @@ class InMemoryCatalog extends Catalog {
override def listFunctions(db: String, pattern: String): Seq[String] = synchronized {
assertDbExists(db)
- val regex = pattern.replaceAll("\\*", ".*").r
filterPattern(catalog(db).functions.keysIterator.toSeq, pattern)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index a6caf91f33..b4d7dd2f4e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -29,17 +29,15 @@ import org.apache.spark.sql.AnalysisException
* Implementations should throw [[AnalysisException]] when table or database don't exist.
*/
abstract class Catalog {
+ import Catalog._
// --------------------------------------------------------------------------
// Databases
// --------------------------------------------------------------------------
- def createDatabase(dbDefinition: Database, ifNotExists: Boolean): Unit
+ def createDatabase(dbDefinition: Database, ignoreIfExists: Boolean): Unit
- def dropDatabase(
- db: String,
- ignoreIfNotExists: Boolean,
- cascade: Boolean): Unit
+ def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
def alterDatabase(db: String, dbDefinition: Database): Unit
@@ -71,11 +69,28 @@ abstract class Catalog {
// Partitions
// --------------------------------------------------------------------------
- // TODO: need more functions for partitioning.
+ def createPartitions(
+ db: String,
+ table: String,
+ parts: Seq[TablePartition],
+ ignoreIfExists: Boolean): Unit
- def alterPartition(db: String, table: String, part: TablePartition): Unit
+ def dropPartitions(
+ db: String,
+ table: String,
+ parts: Seq[PartitionSpec],
+ ignoreIfNotExists: Boolean): Unit
- def alterPartitions(db: String, table: String, parts: Seq[TablePartition]): Unit
+ def alterPartition(
+ db: String,
+ table: String,
+ spec: PartitionSpec,
+ newPart: TablePartition): Unit
+
+ def getPartition(db: String, table: String, spec: PartitionSpec): TablePartition
+
+ // TODO: support listing by pattern
+ def listPartitions(db: String, table: String): Seq[TablePartition]
// --------------------------------------------------------------------------
// Functions
@@ -132,11 +147,11 @@ case class Column(
/**
* A partition (Hive style) defined in the catalog.
*
- * @param values values for the partition columns
+ * @param spec partition spec values indexed by column name
* @param storage storage format of the partition
*/
case class TablePartition(
- values: Seq[String],
+ spec: Catalog.PartitionSpec,
storage: StorageFormat
)
@@ -176,3 +191,8 @@ case class Database(
locationUri: String,
properties: Map[String, String]
)
+
+
+object Catalog {
+ type PartitionSpec = Map[String, String]
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
index ab9d5ac8a2..0d8434323f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
@@ -27,6 +27,11 @@ import org.apache.spark.sql.AnalysisException
* Implementations of the [[Catalog]] interface can create test suites by extending this.
*/
abstract class CatalogTestCases extends SparkFunSuite {
+ private val storageFormat = StorageFormat("usa", "$", "zzz", "serde", Map.empty[String, String])
+ private val part1 = TablePartition(Map[String, String]("a" -> "1"), storageFormat)
+ private val part2 = TablePartition(Map[String, String]("b" -> "2"), storageFormat)
+ private val part3 = TablePartition(Map[String, String]("c" -> "3"), storageFormat)
+ private val funcClass = "org.apache.spark.myFunc"
protected def newEmptyCatalog(): Catalog
@@ -41,16 +46,16 @@ abstract class CatalogTestCases extends SparkFunSuite {
*/
private def newBasicCatalog(): Catalog = {
val catalog = newEmptyCatalog()
- catalog.createDatabase(newDb("db1"), ifNotExists = false)
- catalog.createDatabase(newDb("db2"), ifNotExists = false)
-
+ catalog.createDatabase(newDb("db1"), ignoreIfExists = false)
+ catalog.createDatabase(newDb("db2"), ignoreIfExists = false)
catalog.createTable("db2", newTable("tbl1"), ignoreIfExists = false)
catalog.createTable("db2", newTable("tbl2"), ignoreIfExists = false)
catalog.createFunction("db2", newFunc("func1"), ignoreIfExists = false)
+ catalog.createPartitions("db2", "tbl2", Seq(part1, part2), ignoreIfExists = false)
catalog
}
- private def newFunc(): Function = Function("funcname", "org.apache.spark.MyFunc")
+ private def newFunc(): Function = Function("funcname", funcClass)
private def newDb(name: String = "default"): Database =
Database(name, name + " description", "uri", Map.empty)
@@ -59,7 +64,7 @@ abstract class CatalogTestCases extends SparkFunSuite {
Table(name, "", Seq.empty, Seq.empty, Seq.empty, null, 0, Map.empty, "EXTERNAL_TABLE", 0, 0,
None, None)
- private def newFunc(name: String): Function = Function(name, "class.name")
+ private def newFunc(name: String): Function = Function(name, funcClass)
// --------------------------------------------------------------------------
// Databases
@@ -67,10 +72,10 @@ abstract class CatalogTestCases extends SparkFunSuite {
test("basic create, drop and list databases") {
val catalog = newEmptyCatalog()
- catalog.createDatabase(newDb(), ifNotExists = false)
+ catalog.createDatabase(newDb(), ignoreIfExists = false)
assert(catalog.listDatabases().toSet == Set("default"))
- catalog.createDatabase(newDb("default2"), ifNotExists = false)
+ catalog.createDatabase(newDb("default2"), ignoreIfExists = false)
assert(catalog.listDatabases().toSet == Set("default", "default2"))
}
@@ -253,11 +258,194 @@ abstract class CatalogTestCases extends SparkFunSuite {
// Partitions
// --------------------------------------------------------------------------
- // TODO: Add tests cases for partitions
+ test("basic create and list partitions") {
+ val catalog = newEmptyCatalog()
+ catalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
+ catalog.createTable("mydb", newTable("mytbl"), ignoreIfExists = false)
+ catalog.createPartitions("mydb", "mytbl", Seq(part1, part2), ignoreIfExists = false)
+ assert(catalog.listPartitions("mydb", "mytbl").toSet == Set(part1, part2))
+ }
+
+ test("create partitions when database / table does not exist") {
+ val catalog = newBasicCatalog()
+ intercept[AnalysisException] {
+ catalog.createPartitions("does_not_exist", "tbl1", Seq(), ignoreIfExists = false)
+ }
+ intercept[AnalysisException] {
+ catalog.createPartitions("db2", "does_not_exist", Seq(), ignoreIfExists = false)
+ }
+ }
+
+ test("create partitions that already exist") {
+ val catalog = newBasicCatalog()
+ intercept[AnalysisException] {
+ catalog.createPartitions("db2", "tbl2", Seq(part1), ignoreIfExists = false)
+ }
+ catalog.createPartitions("db2", "tbl2", Seq(part1), ignoreIfExists = true)
+ }
+
+ test("drop partitions") {
+ val catalog = newBasicCatalog()
+ assert(catalog.listPartitions("db2", "tbl2").toSet == Set(part1, part2))
+ catalog.dropPartitions("db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false)
+ assert(catalog.listPartitions("db2", "tbl2").toSet == Set(part2))
+ val catalog2 = newBasicCatalog()
+ assert(catalog2.listPartitions("db2", "tbl2").toSet == Set(part1, part2))
+ catalog2.dropPartitions("db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false)
+ assert(catalog2.listPartitions("db2", "tbl2").isEmpty)
+ }
+
+ test("drop partitions when database / table does not exist") {
+ val catalog = newBasicCatalog()
+ intercept[AnalysisException] {
+ catalog.dropPartitions("does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false)
+ }
+ intercept[AnalysisException] {
+ catalog.dropPartitions("db2", "does_not_exist", Seq(), ignoreIfNotExists = false)
+ }
+ }
+
+ test("drop partitions that do not exist") {
+ val catalog = newBasicCatalog()
+ intercept[AnalysisException] {
+ catalog.dropPartitions("db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false)
+ }
+ catalog.dropPartitions("db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true)
+ }
+
+ test("get partition") {
+ val catalog = newBasicCatalog()
+ assert(catalog.getPartition("db2", "tbl2", part1.spec) == part1)
+ assert(catalog.getPartition("db2", "tbl2", part2.spec) == part2)
+ intercept[AnalysisException] {
+ catalog.getPartition("db2", "tbl1", part3.spec)
+ }
+ }
+
+ test("get partition when database / table does not exist") {
+ val catalog = newBasicCatalog()
+ intercept[AnalysisException] {
+ catalog.getPartition("does_not_exist", "tbl1", part1.spec)
+ }
+ intercept[AnalysisException] {
+ catalog.getPartition("db2", "does_not_exist", part1.spec)
+ }
+ }
+
+ test("alter partitions") {
+ val catalog = newBasicCatalog()
+ val partSameSpec = part1.copy(storage = storageFormat.copy(serde = "myserde"))
+ val partNewSpec = part1.copy(spec = Map("x" -> "10"))
+ // alter but keep spec the same
+ catalog.alterPartition("db2", "tbl2", part1.spec, partSameSpec)
+ assert(catalog.getPartition("db2", "tbl2", part1.spec) == partSameSpec)
+ // alter and change spec
+ catalog.alterPartition("db2", "tbl2", part1.spec, partNewSpec)
+ intercept[AnalysisException] {
+ catalog.getPartition("db2", "tbl2", part1.spec)
+ }
+ assert(catalog.getPartition("db2", "tbl2", partNewSpec.spec) == partNewSpec)
+ }
+
+ test("alter partition when database / table does not exist") {
+ val catalog = newBasicCatalog()
+ intercept[AnalysisException] {
+ catalog.alterPartition("does_not_exist", "tbl1", part1.spec, part1)
+ }
+ intercept[AnalysisException] {
+ catalog.alterPartition("db2", "does_not_exist", part1.spec, part1)
+ }
+ }
// --------------------------------------------------------------------------
// Functions
// --------------------------------------------------------------------------
- // TODO: Add tests cases for functions
+ test("basic create and list functions") {
+ val catalog = newEmptyCatalog()
+ catalog.createDatabase(newDb("mydb"), ignoreIfExists = false)
+ catalog.createFunction("mydb", newFunc("myfunc"), ignoreIfExists = false)
+ assert(catalog.listFunctions("mydb", "*").toSet == Set("myfunc"))
+ }
+
+ test("create function when database does not exist") {
+ val catalog = newBasicCatalog()
+ intercept[AnalysisException] {
+ catalog.createFunction("does_not_exist", newFunc(), ignoreIfExists = false)
+ }
+ }
+
+ test("create function that already exists") {
+ val catalog = newBasicCatalog()
+ intercept[AnalysisException] {
+ catalog.createFunction("db2", newFunc("func1"), ignoreIfExists = false)
+ }
+ catalog.createFunction("db2", newFunc("func1"), ignoreIfExists = true)
+ }
+
+ test("drop function") {
+ val catalog = newBasicCatalog()
+ assert(catalog.listFunctions("db2", "*").toSet == Set("func1"))
+ catalog.dropFunction("db2", "func1")
+ assert(catalog.listFunctions("db2", "*").isEmpty)
+ }
+
+ test("drop function when database does not exist") {
+ val catalog = newBasicCatalog()
+ intercept[AnalysisException] {
+ catalog.dropFunction("does_not_exist", "something")
+ }
+ }
+
+ test("drop function that does not exist") {
+ val catalog = newBasicCatalog()
+ intercept[AnalysisException] {
+ catalog.dropFunction("db2", "does_not_exist")
+ }
+ }
+
+ test("get function") {
+ val catalog = newBasicCatalog()
+ assert(catalog.getFunction("db2", "func1") == newFunc("func1"))
+ intercept[AnalysisException] {
+ catalog.getFunction("db2", "does_not_exist")
+ }
+ }
+
+ test("get function when database does not exist") {
+ val catalog = newBasicCatalog()
+ intercept[AnalysisException] {
+ catalog.getFunction("does_not_exist", "func1")
+ }
+ }
+
+ test("alter function") {
+ val catalog = newBasicCatalog()
+ assert(catalog.getFunction("db2", "func1").className == funcClass)
+ // alter func but keep name
+ catalog.alterFunction("db2", "func1", newFunc("func1").copy(className = "muhaha"))
+ assert(catalog.getFunction("db2", "func1").className == "muhaha")
+ // alter func and change name
+ catalog.alterFunction("db2", "func1", newFunc("funcky"))
+ intercept[AnalysisException] {
+ catalog.getFunction("db2", "func1")
+ }
+ assert(catalog.getFunction("db2", "funcky").className == funcClass)
+ }
+
+ test("alter function when database does not exist") {
+ val catalog = newBasicCatalog()
+ intercept[AnalysisException] {
+ catalog.alterFunction("does_not_exist", "func1", newFunc())
+ }
+ }
+
+ test("list functions") {
+ val catalog = newBasicCatalog()
+ catalog.createFunction("db2", newFunc("func2"), ignoreIfExists = false)
+ catalog.createFunction("db2", newFunc("not_me"), ignoreIfExists = false)
+ assert(catalog.listFunctions("db2", "*").toSet == Set("func1", "func2", "not_me"))
+ assert(catalog.listFunctions("db2", "func*").toSet == Set("func1", "func2"))
+ }
+
}