aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-05-09 12:40:30 +0800
committerWenchen Fan <wenchen@databricks.com>2016-05-09 12:40:30 +0800
commite9131ec277731de4a73026f2fb4559182c236f84 (patch)
tree2822aa995ed55154f7112b44bbb98821df2f6936 /sql
parent454ba4d67e782369627dfe60261e6648a27b91a0 (diff)
downloadspark-e9131ec277731de4a73026f2fb4559182c236f84.tar.gz
spark-e9131ec277731de4a73026f2fb4559182c236f84.tar.bz2
spark-e9131ec277731de4a73026f2fb4559182c236f84.zip
[SPARK-15185][SQL] InMemoryCatalog: Silent Removal of an Existent Table/Function/Partitions by Rename
#### What changes were proposed in this pull request? So far, in the implementation of InMemoryCatalog, we do not check if the new/destination table/function/partition exists or not. Thus, we just silently remove the existent table/function/partition. This PR is to detect them and issue an appropriate exception. #### How was this patch tested? Added the related test cases. They also verify if HiveExternalCatalog also detects these errors. Author: gatorsmile <gatorsmile@gmail.com> Closes #12960 from gatorsmile/renameInMemoryCatalog.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala45
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala22
2 files changed, 62 insertions, 5 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index c65f461129..676a9e10ae 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -59,6 +59,13 @@ class InMemoryCatalog extends ExternalCatalog {
}
}
+ private def requireFunctionNotExists(db: String, funcName: String): Unit = {
+ if (functionExists(db, funcName)) {
+ throw new AnalysisException(
+ s"Function already exists: '$funcName' exists in database '$db'")
+ }
+ }
+
private def requireTableExists(db: String, table: String): Unit = {
if (!tableExists(db, table)) {
throw new AnalysisException(
@@ -66,10 +73,34 @@ class InMemoryCatalog extends ExternalCatalog {
}
}
- private def requirePartitionExists(db: String, table: String, spec: TablePartitionSpec): Unit = {
- if (!partitionExists(db, table, spec)) {
+ private def requireTableNotExists(db: String, table: String): Unit = {
+ if (tableExists(db, table)) {
throw new AnalysisException(
- s"Partition not found: database '$db' table '$table' does not contain: '$spec'")
+ s"Table or view exists: '$table' exists in database '$db'")
+ }
+ }
+
+ private def requirePartitionsExist(
+ db: String,
+ table: String,
+ specs: Seq[TablePartitionSpec]): Unit = {
+ specs foreach { s =>
+ if (!partitionExists(db, table, s)) {
+ throw new AnalysisException(
+ s"Partition not found: database '$db' table '$table' does not contain: '$s'")
+ }
+ }
+ }
+
+ private def requirePartitionsNotExist(
+ db: String,
+ table: String,
+ specs: Seq[TablePartitionSpec]): Unit = {
+ specs foreach { s =>
+ if (partitionExists(db, table, s)) {
+ throw new AnalysisException(
+ s"Partition exists: database '$db' table '$table' already contains: '$s'")
+ }
}
}
@@ -171,6 +202,7 @@ class InMemoryCatalog extends ExternalCatalog {
override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized {
requireTableExists(db, oldName)
+ requireTableNotExists(db, newName)
val oldDesc = catalog(db).tables(oldName)
oldDesc.table = oldDesc.table.copy(identifier = TableIdentifier(newName, Some(db)))
catalog(db).tables.put(newName, oldDesc)
@@ -272,6 +304,8 @@ class InMemoryCatalog extends ExternalCatalog {
specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]): Unit = synchronized {
require(specs.size == newSpecs.size, "number of old and new partition specs differ")
+ requirePartitionsExist(db, table, specs)
+ requirePartitionsNotExist(db, table, newSpecs)
specs.zip(newSpecs).foreach { case (oldSpec, newSpec) =>
val newPart = getPartition(db, table, oldSpec).copy(spec = newSpec)
val existingParts = catalog(db).tables(table).partitions
@@ -284,8 +318,8 @@ class InMemoryCatalog extends ExternalCatalog {
db: String,
table: String,
parts: Seq[CatalogTablePartition]): Unit = synchronized {
+ requirePartitionsExist(db, table, parts.map(p => p.spec))
parts.foreach { p =>
- requirePartitionExists(db, table, p.spec)
catalog(db).tables(table).partitions.put(p.spec, p)
}
}
@@ -294,7 +328,7 @@ class InMemoryCatalog extends ExternalCatalog {
db: String,
table: String,
spec: TablePartitionSpec): CatalogTablePartition = synchronized {
- requirePartitionExists(db, table, spec)
+ requirePartitionsExist(db, table, Seq(spec))
catalog(db).tables(table).partitions(spec)
}
@@ -330,6 +364,7 @@ class InMemoryCatalog extends ExternalCatalog {
override def renameFunction(db: String, oldName: String, newName: String): Unit = synchronized {
requireFunctionExists(db, oldName)
+ requireFunctionNotExists(db, newName)
val newFunc = getFunction(db, oldName).copy(identifier = FunctionIdentifier(newName, Some(db)))
catalog(db).functions.remove(oldName)
catalog(db).functions.put(newName, newFunc)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index ae7c503e65..e347734290 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -198,6 +198,13 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
}
}
+ test("rename table when destination table already exists") {
+ val catalog = newBasicCatalog()
+ intercept[AnalysisException] {
+ catalog.renameTable("db2", "tbl1", "tbl2")
+ }
+ }
+
test("alter table") {
val catalog = newBasicCatalog()
val tbl1 = catalog.getTable("db2", "tbl1")
@@ -356,6 +363,13 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
}
}
+ test("rename partitions when the new partition already exists") {
+ val catalog = newBasicCatalog()
+ intercept[AnalysisException] {
+ catalog.renamePartitions("db2", "tbl2", Seq(part1.spec), Seq(part2.spec))
+ }
+ }
+
test("alter partitions") {
val catalog = newBasicCatalog()
try {
@@ -480,6 +494,14 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
}
}
+ test("rename function when new function already exists") {
+ val catalog = newBasicCatalog()
+ catalog.createFunction("db2", newFunc("func2", Some("db2")))
+ intercept[AnalysisException] {
+ catalog.renameFunction("db2", "func1", "func2")
+ }
+ }
+
test("list functions") {
val catalog = newBasicCatalog()
catalog.createFunction("db2", newFunc("func2"))