aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2014-11-03 13:59:43 -0800
committerMichael Armbrust <michael@databricks.com>2014-11-03 13:59:43 -0800
commite83f13e8d37ca33f4e183e977d077221b90c6025 (patch)
treeca4225da3ff6200063eab253c4b111435c4f862d
parentc238fb423d1011bd1b1e6201d769b72e52664fc6 (diff)
downloadspark-e83f13e8d37ca33f4e183e977d077221b90c6025.tar.gz
spark-e83f13e8d37ca33f4e183e977d077221b90c6025.tar.bz2
spark-e83f13e8d37ca33f4e183e977d077221b90c6025.zip
[SPARK-4152] [SQL] Avoid data change in CTAS while table already existed
CREATE TABLE t1 (a String); CREATE TABLE t1 AS SELECT key FROM src; – throw exception CREATE TABLE if not exists t1 AS SELECT key FROM src; – expect do nothing, currently it will overwrite the t1, which is incorrect. Author: Cheng Hao <hao.cheng@intel.com> Closes #3013 from chenghao-intel/ctas_unittest and squashes the following commits: 194113e [Cheng Hao] fix bug in CTAS when table already existed
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala22
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala12
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala9
4 files changed, 46 insertions, 3 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
index 2059a91ba0..0415d74bd8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
@@ -28,6 +28,8 @@ trait Catalog {
def caseSensitive: Boolean
+ def tableExists(db: Option[String], tableName: String): Boolean
+
def lookupRelation(
databaseName: Option[String],
tableName: String,
@@ -82,6 +84,14 @@ class SimpleCatalog(val caseSensitive: Boolean) extends Catalog {
tables.clear()
}
+ override def tableExists(db: Option[String], tableName: String): Boolean = {
+ val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
+ tables.get(tblName) match {
+ case Some(_) => true
+ case None => false
+ }
+ }
+
override def lookupRelation(
databaseName: Option[String],
tableName: String,
@@ -107,6 +117,14 @@ trait OverrideCatalog extends Catalog {
// TODO: This doesn't work when the database changes...
val overrides = new mutable.HashMap[(Option[String],String), LogicalPlan]()
+ abstract override def tableExists(db: Option[String], tableName: String): Boolean = {
+ val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
+ overrides.get((dbName, tblName)) match {
+ case Some(_) => true
+ case None => super.tableExists(db, tableName)
+ }
+ }
+
abstract override def lookupRelation(
databaseName: Option[String],
tableName: String,
@@ -149,6 +167,10 @@ object EmptyCatalog extends Catalog {
val caseSensitive: Boolean = true
+ def tableExists(db: Option[String], tableName: String): Boolean = {
+ throw new UnsupportedOperationException
+ }
+
def lookupRelation(
databaseName: Option[String],
tableName: String,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 096b4a07aa..0baf4c9f8c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -57,6 +57,12 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
val caseSensitive: Boolean = false
+ def tableExists(db: Option[String], tableName: String): Boolean = {
+ val (databaseName, tblName) = processDatabaseAndTableName(
+ db.getOrElse(hive.sessionState.getCurrentDatabase), tableName)
+ client.getTable(databaseName, tblName, false) != null
+ }
+
def lookupRelation(
db: Option[String],
tableName: String,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index 2fce414734..3d24d87bc3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -71,7 +71,17 @@ case class CreateTableAsSelect(
// TODO ideally, we should get the output data ready first and then
// add the relation into catalog, just in case of failure occurs while data
// processing.
- sc.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true)).toRdd
+ if (sc.catalog.tableExists(Some(database), tableName)) {
+ if (allowExisting) {
+ // table already exists, will do nothing, to keep consistent with Hive
+ } else {
+ throw
+ new org.apache.hadoop.hive.metastore.api.AlreadyExistsException(s"$database.$tableName")
+ }
+ } else {
+ sc.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true)).toRdd
+ }
+
Seq.empty[Row]
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 76a0ec01a6..e9b1943ff8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -56,7 +56,7 @@ class SQLQuerySuite extends QueryTest {
sql(
"""CREATE TABLE IF NOT EXISTS ctas4 AS
| SELECT 1 AS key, value FROM src LIMIT 1""".stripMargin).collect
- // expect the string => integer for field key cause the table ctas4 already existed.
+ // do nothing cause the table ctas4 already existed.
sql(
"""CREATE TABLE IF NOT EXISTS ctas4 AS
| SELECT key, value FROM src ORDER BY key, value""".stripMargin).collect
@@ -78,9 +78,14 @@ class SQLQuerySuite extends QueryTest {
SELECT key, value
FROM src
ORDER BY key, value""").collect().toSeq)
+ intercept[org.apache.hadoop.hive.metastore.api.AlreadyExistsException] {
+ sql(
+ """CREATE TABLE ctas4 AS
+ | SELECT key, value FROM src ORDER BY key, value""".stripMargin).collect
+ }
checkAnswer(
sql("SELECT key, value FROM ctas4 ORDER BY key, value"),
- sql("SELECT CAST(key AS int) k, value FROM src ORDER BY k, value").collect().toSeq)
+ sql("SELECT key, value FROM ctas4 LIMIT 1").collect().toSeq)
checkExistence(sql("DESC EXTENDED ctas2"), true,
"name:key", "type:string", "name:value", "ctas2",