diff options
author | gatorsmile <gatorsmile@gmail.com> | 2016-05-09 12:40:30 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2016-05-09 12:40:30 +0800 |
commit | e9131ec277731de4a73026f2fb4559182c236f84 (patch) | |
tree | 2822aa995ed55154f7112b44bbb98821df2f6936 | |
parent | 454ba4d67e782369627dfe60261e6648a27b91a0 (diff) | |
download | spark-e9131ec277731de4a73026f2fb4559182c236f84.tar.gz spark-e9131ec277731de4a73026f2fb4559182c236f84.tar.bz2 spark-e9131ec277731de4a73026f2fb4559182c236f84.zip |
[SPARK-15185][SQL] InMemoryCatalog: Silent Removal of an Existent Table/Function/Partitions by Rename
#### What changes were proposed in this pull request?
So far, in the implementation of InMemoryCatalog, we do not check if the new/destination table/function/partition exists or not. Thus, we just silently remove the existent table/function/partition.
This PR is to detect them and issue an appropriate exception.
#### How was this patch tested?
Added the related test cases. They also verify if HiveExternalCatalog also detects these errors.
Author: gatorsmile <gatorsmile@gmail.com>
Closes #12960 from gatorsmile/renameInMemoryCatalog.
2 files changed, 62 insertions, 5 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index c65f461129..676a9e10ae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -59,6 +59,13 @@ class InMemoryCatalog extends ExternalCatalog { } } + private def requireFunctionNotExists(db: String, funcName: String): Unit = { + if (functionExists(db, funcName)) { + throw new AnalysisException( + s"Function already exists: '$funcName' exists in database '$db'") + } + } + private def requireTableExists(db: String, table: String): Unit = { if (!tableExists(db, table)) { throw new AnalysisException( @@ -66,10 +73,34 @@ class InMemoryCatalog extends ExternalCatalog { } } - private def requirePartitionExists(db: String, table: String, spec: TablePartitionSpec): Unit = { - if (!partitionExists(db, table, spec)) { + private def requireTableNotExists(db: String, table: String): Unit = { + if (tableExists(db, table)) { throw new AnalysisException( - s"Partition not found: database '$db' table '$table' does not contain: '$spec'") + s"Table or view exists: '$table' exists in database '$db'") + } + } + + private def requirePartitionsExist( + db: String, + table: String, + specs: Seq[TablePartitionSpec]): Unit = { + specs foreach { s => + if (!partitionExists(db, table, s)) { + throw new AnalysisException( + s"Partition not found: database '$db' table '$table' does not contain: '$s'") + } + } + } + + private def requirePartitionsNotExist( + db: String, + table: String, + specs: Seq[TablePartitionSpec]): Unit = { + specs foreach { s => + if (partitionExists(db, table, s)) { + throw new AnalysisException( + s"Partition exists: database '$db' table '$table' already contains: '$s'") + } } } @@ -171,6 +202,7 @@ class InMemoryCatalog extends ExternalCatalog { override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized { requireTableExists(db, oldName) + requireTableNotExists(db, newName) val oldDesc = catalog(db).tables(oldName) oldDesc.table = oldDesc.table.copy(identifier = TableIdentifier(newName, Some(db))) catalog(db).tables.put(newName, oldDesc) @@ -272,6 +304,8 @@ class InMemoryCatalog extends ExternalCatalog { specs: Seq[TablePartitionSpec], newSpecs: Seq[TablePartitionSpec]): Unit = synchronized { require(specs.size == newSpecs.size, "number of old and new partition specs differ") + requirePartitionsExist(db, table, specs) + requirePartitionsNotExist(db, table, newSpecs) specs.zip(newSpecs).foreach { case (oldSpec, newSpec) => val newPart = getPartition(db, table, oldSpec).copy(spec = newSpec) val existingParts = catalog(db).tables(table).partitions @@ -284,8 +318,8 @@ class InMemoryCatalog extends ExternalCatalog { db: String, table: String, parts: Seq[CatalogTablePartition]): Unit = synchronized { + requirePartitionsExist(db, table, parts.map(p => p.spec)) parts.foreach { p => - requirePartitionExists(db, table, p.spec) catalog(db).tables(table).partitions.put(p.spec, p) } } @@ -294,7 +328,7 @@ class InMemoryCatalog extends ExternalCatalog { db: String, table: String, spec: TablePartitionSpec): CatalogTablePartition = synchronized { - requirePartitionExists(db, table, spec) + requirePartitionsExist(db, table, Seq(spec)) catalog(db).tables(table).partitions(spec) } @@ -330,6 +364,7 @@ class InMemoryCatalog extends ExternalCatalog { override def renameFunction(db: String, oldName: String, newName: String): Unit = synchronized { requireFunctionExists(db, oldName) + requireFunctionNotExists(db, newName) val newFunc = getFunction(db, oldName).copy(identifier = FunctionIdentifier(newName, Some(db))) catalog(db).functions.remove(oldName) catalog(db).functions.put(newName, newFunc) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index ae7c503e65..e347734290 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -198,6 +198,13 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac } } + test("rename table when destination table already exists") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { + catalog.renameTable("db2", "tbl1", "tbl2") + } + } + test("alter table") { val catalog = newBasicCatalog() val tbl1 = catalog.getTable("db2", "tbl1") @@ -356,6 +363,13 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac } } + test("rename partitions when the new partition already exists") { + val catalog = newBasicCatalog() + intercept[AnalysisException] { + catalog.renamePartitions("db2", "tbl2", Seq(part1.spec), Seq(part2.spec)) + } + } + test("alter partitions") { val catalog = newBasicCatalog() try { @@ -480,6 +494,14 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac } } + test("rename function when new function already exists") { + val catalog = newBasicCatalog() + catalog.createFunction("db2", newFunc("func2", Some("db2"))) + intercept[AnalysisException] { + catalog.renameFunction("db2", "func1", "func2") + } + } + test("list functions") { val catalog = newBasicCatalog() catalog.createFunction("db2", newFunc("func2")) |