aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2014-11-03 13:59:43 -0800
committerMichael Armbrust <michael@databricks.com>2014-11-03 13:59:43 -0800
commite83f13e8d37ca33f4e183e977d077221b90c6025 (patch)
treeca4225da3ff6200063eab253c4b111435c4f862d /sql/hive
parentc238fb423d1011bd1b1e6201d769b72e52664fc6 (diff)
downloadspark-e83f13e8d37ca33f4e183e977d077221b90c6025.tar.gz
spark-e83f13e8d37ca33f4e183e977d077221b90c6025.tar.bz2
spark-e83f13e8d37ca33f4e183e977d077221b90c6025.zip
[SPARK-4152] [SQL] Avoid data change in CTAS while table already existed
CREATE TABLE t1 (a String); CREATE TABLE t1 AS SELECT key FROM src; – throw exception CREATE TABLE if not exists t1 AS SELECT key FROM src; – expect do nothing, currently it will overwrite the t1, which is incorrect. Author: Cheng Hao <hao.cheng@intel.com> Closes #3013 from chenghao-intel/ctas_unittest and squashes the following commits: 194113e [Cheng Hao] fix bug in CTAS when table already existed
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala12
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala9
3 files changed, 24 insertions, 3 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 096b4a07aa..0baf4c9f8c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -57,6 +57,12 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
val caseSensitive: Boolean = false
+ def tableExists(db: Option[String], tableName: String): Boolean = {
+ val (databaseName, tblName) = processDatabaseAndTableName(
+ db.getOrElse(hive.sessionState.getCurrentDatabase), tableName)
+ client.getTable(databaseName, tblName, false) != null
+ }
+
def lookupRelation(
db: Option[String],
tableName: String,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index 2fce414734..3d24d87bc3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -71,7 +71,17 @@ case class CreateTableAsSelect(
// TODO ideally, we should get the output data ready first and then
// add the relation into catalog, just in case of failure occurs while data
// processing.
- sc.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true)).toRdd
+ if (sc.catalog.tableExists(Some(database), tableName)) {
+ if (allowExisting) {
+ // table already exists, will do nothing, to keep consistent with Hive
+ } else {
+ throw
+ new org.apache.hadoop.hive.metastore.api.AlreadyExistsException(s"$database.$tableName")
+ }
+ } else {
+ sc.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true)).toRdd
+ }
+
Seq.empty[Row]
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 76a0ec01a6..e9b1943ff8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -56,7 +56,7 @@ class SQLQuerySuite extends QueryTest {
sql(
"""CREATE TABLE IF NOT EXISTS ctas4 AS
| SELECT 1 AS key, value FROM src LIMIT 1""".stripMargin).collect
- // expect the string => integer for field key cause the table ctas4 already existed.
+ // do nothing cause the table ctas4 already existed.
sql(
"""CREATE TABLE IF NOT EXISTS ctas4 AS
| SELECT key, value FROM src ORDER BY key, value""".stripMargin).collect
@@ -78,9 +78,14 @@ class SQLQuerySuite extends QueryTest {
SELECT key, value
FROM src
ORDER BY key, value""").collect().toSeq)
+ intercept[org.apache.hadoop.hive.metastore.api.AlreadyExistsException] {
+ sql(
+ """CREATE TABLE ctas4 AS
+ | SELECT key, value FROM src ORDER BY key, value""".stripMargin).collect
+ }
checkAnswer(
sql("SELECT key, value FROM ctas4 ORDER BY key, value"),
- sql("SELECT CAST(key AS int) k, value FROM src ORDER BY k, value").collect().toSeq)
+ sql("SELECT key, value FROM ctas4 LIMIT 1").collect().toSeq)
checkExistence(sql("DESC EXTENDED ctas2"), true,
"name:key", "type:string", "name:value", "ctas2",