aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-07-26 09:32:29 +0800
committerWenchen Fan <wenchen@databricks.com>2016-07-26 09:32:29 +0800
commit3fc456694151e766c551b4bc58ed7c9457777666 (patch)
tree38eeb9d7ba104f777544c4a7f9a0443e6095c385
parente164a04b2ba3503e5c14cd1cd4beb40e0b79925a (diff)
downloadspark-3fc456694151e766c551b4bc58ed7c9457777666.tar.gz
spark-3fc456694151e766c551b4bc58ed7c9457777666.tar.bz2
spark-3fc456694151e766c551b4bc58ed7c9457777666.zip
[SPARK-16678][SPARK-16677][SQL] Fix two View-related bugs
## What changes were proposed in this pull request? **Issue 1: Disallow Creating/Altering a View when the same-name Table Exists (without IF NOT EXISTS)** When we create OR alter a view, we check whether the view already exists. In the current implementation, if a table with the same name exists, we treat it as a view. However, this is not the right behavior. We should follow what Hive does. For example, ``` hive> CREATE TABLE tab1 (id int); OK Time taken: 0.196 seconds hive> CREATE OR REPLACE VIEW tab1 AS SELECT * FROM t1; FAILED: SemanticException [Error 10218]: Existing table is not a view The following is an existing table, not a view: default.tab1 hive> ALTER VIEW tab1 AS SELECT * FROM t1; FAILED: SemanticException [Error 10218]: Existing table is not a view The following is an existing table, not a view: default.tab1 hive> CREATE VIEW IF NOT EXISTS tab1 AS SELECT * FROM t1; OK Time taken: 0.678 seconds ``` **Issue 2: Strange Error when Issuing Load Table Against A View** Users should not be allowed to issue LOAD DATA against a view. Currently, when users doing it, we got a very strange runtime error. For example, ```SQL LOAD DATA LOCAL INPATH "$testData" INTO TABLE $viewName ``` ``` java.lang.reflect.InvocationTargetException was thrown. java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.sql.hive.client.Shim_v0_14.loadTable(HiveShim.scala:680) ``` ## How was this patch tested? Added test cases Author: gatorsmile <gatorsmile@gmail.com> Closes #14314 from gatorsmile/tableDDLAgainstView.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala33
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala71
4 files changed, 96 insertions, 19 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 134fc4e698..1856dc4d64 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -443,12 +443,12 @@ class SessionCatalog(
}
/**
- * Return whether a table with the specified name exists.
+ * Return whether a table/view with the specified name exists.
*
- * Note: If a database is explicitly specified, then this will return whether the table
+ * Note: If a database is explicitly specified, then this will return whether the table/view
* exists in that particular database instead. In that case, even if there is a temporary
* table with the same name, we will return false if the specified database does not
- * contain the table.
+ * contain the table/view.
*/
def tableExists(name: TableIdentifier): Boolean = synchronized {
val db = formatDatabaseName(name.database.getOrElse(currentDb))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 8f3adadbf3..c6daa95286 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -202,35 +202,38 @@ case class LoadDataCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
if (!catalog.tableExists(table)) {
- throw new AnalysisException(s"Target table in LOAD DATA does not exist: '$table'")
+ throw new AnalysisException(s"Target table in LOAD DATA does not exist: $table")
}
val targetTable = catalog.getTableMetadataOption(table).getOrElse {
- throw new AnalysisException(s"Target table in LOAD DATA cannot be temporary: '$table'")
+ throw new AnalysisException(s"Target table in LOAD DATA cannot be temporary: $table")
+ }
+ if (targetTable.tableType == CatalogTableType.VIEW) {
+ throw new AnalysisException(s"Target table in LOAD DATA cannot be a view: $table")
}
if (DDLUtils.isDatasourceTable(targetTable)) {
- throw new AnalysisException(s"LOAD DATA is not supported for datasource tables: '$table'")
+ throw new AnalysisException(s"LOAD DATA is not supported for datasource tables: $table")
}
if (targetTable.partitionColumnNames.nonEmpty) {
if (partition.isEmpty) {
- throw new AnalysisException(s"LOAD DATA target table '$table' is partitioned, " +
+ throw new AnalysisException(s"LOAD DATA target table $table is partitioned, " +
s"but no partition spec is provided")
}
if (targetTable.partitionColumnNames.size != partition.get.size) {
- throw new AnalysisException(s"LOAD DATA target table '$table' is partitioned, " +
+ throw new AnalysisException(s"LOAD DATA target table $table is partitioned, " +
s"but number of columns in provided partition spec (${partition.get.size}) " +
s"do not match number of partitioned columns in table " +
s"(s${targetTable.partitionColumnNames.size})")
}
partition.get.keys.foreach { colName =>
if (!targetTable.partitionColumnNames.contains(colName)) {
- throw new AnalysisException(s"LOAD DATA target table '$table' is partitioned, " +
+ throw new AnalysisException(s"LOAD DATA target table $table is partitioned, " +
s"but the specified partition spec refers to a column that is not partitioned: " +
s"'$colName'")
}
}
} else {
if (partition.nonEmpty) {
- throw new AnalysisException(s"LOAD DATA target table '$table' is not partitioned, " +
+ throw new AnalysisException(s"LOAD DATA target table $table is not partitioned, " +
s"but a partition spec was provided.")
}
}
@@ -321,31 +324,31 @@ case class TruncateTableCommand(
override def run(spark: SparkSession): Seq[Row] = {
val catalog = spark.sessionState.catalog
if (!catalog.tableExists(tableName)) {
- throw new AnalysisException(s"Table '$tableName' in TRUNCATE TABLE does not exist.")
+ throw new AnalysisException(s"Table $tableName in TRUNCATE TABLE does not exist.")
}
if (catalog.isTemporaryTable(tableName)) {
throw new AnalysisException(
- s"Operation not allowed: TRUNCATE TABLE on temporary tables: '$tableName'")
+ s"Operation not allowed: TRUNCATE TABLE on temporary tables: $tableName")
}
val table = catalog.getTableMetadata(tableName)
if (table.tableType == CatalogTableType.EXTERNAL) {
throw new AnalysisException(
- s"Operation not allowed: TRUNCATE TABLE on external tables: '$tableName'")
+ s"Operation not allowed: TRUNCATE TABLE on external tables: $tableName")
}
if (table.tableType == CatalogTableType.VIEW) {
throw new AnalysisException(
- s"Operation not allowed: TRUNCATE TABLE on views: '$tableName'")
+ s"Operation not allowed: TRUNCATE TABLE on views: $tableName")
}
val isDatasourceTable = DDLUtils.isDatasourceTable(table)
if (isDatasourceTable && partitionSpec.isDefined) {
throw new AnalysisException(
s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " +
- s"for tables created using the data sources API: '$tableName'")
+ s"for tables created using the data sources API: $tableName")
}
if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) {
throw new AnalysisException(
s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " +
- s"for tables that are not partitioned: '$tableName'")
+ s"for tables that are not partitioned: $tableName")
}
val locations =
if (isDatasourceTable) {
@@ -366,7 +369,7 @@ case class TruncateTableCommand(
} catch {
case NonFatal(e) =>
throw new AnalysisException(
- s"Failed to truncate table '$tableName' when removing data of the path: $path " +
+ s"Failed to truncate table $tableName when removing data of the path: $path " +
s"because of ${e.toString}")
}
}
@@ -379,7 +382,7 @@ case class TruncateTableCommand(
spark.sharedState.cacheManager.uncacheQuery(spark.table(tableName.quotedString))
} catch {
case NonFatal(e) =>
- log.warn(s"Exception when attempting to uncache table '$tableName'", e)
+ log.warn(s"Exception when attempting to uncache table $tableName", e)
}
Seq.empty[Row]
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 312a1f691b..901a9b9cf5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -113,9 +113,14 @@ case class CreateViewCommand(
val qualifiedName = name.copy(database = Option(database))
if (sessionState.catalog.tableExists(qualifiedName)) {
+ val tableMetadata = sessionState.catalog.getTableMetadata(qualifiedName)
if (allowExisting) {
// Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
// already exists.
+ } else if (tableMetadata.tableType != CatalogTableType.VIEW) {
+ throw new AnalysisException(
+ "Existing table is not a view. The following is an existing table, " +
+ s"not a view: $qualifiedName")
} else if (replace) {
// Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
sessionState.catalog.alterTable(prepareTable(sparkSession, analyzedPlan))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
index 82dc64a457..6a80664417 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.hive.test.TestHiveSingleton
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
/**
@@ -55,6 +54,76 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
+ test("error handling: existing a table with the duplicate name when creating/altering a view") {
+ withTable("tab1") {
+ sql("CREATE TABLE tab1 (id int)")
+ var e = intercept[AnalysisException] {
+ sql("CREATE OR REPLACE VIEW tab1 AS SELECT * FROM jt")
+ }.getMessage
+ assert(e.contains("The following is an existing table, not a view: `default`.`tab1`"))
+ e = intercept[AnalysisException] {
+ sql("CREATE VIEW tab1 AS SELECT * FROM jt")
+ }.getMessage
+ assert(e.contains("The following is an existing table, not a view: `default`.`tab1`"))
+ e = intercept[AnalysisException] {
+ sql("ALTER VIEW tab1 AS SELECT * FROM jt")
+ }.getMessage
+ assert(e.contains("The following is an existing table, not a view: `default`.`tab1`"))
+ }
+ }
+
+ test("existing a table with the duplicate name when CREATE VIEW IF NOT EXISTS") {
+ withTable("tab1") {
+ sql("CREATE TABLE tab1 (id int)")
+ sql("CREATE VIEW IF NOT EXISTS tab1 AS SELECT * FROM jt")
+ checkAnswer(sql("select count(*) FROM tab1"), Row(0))
+ }
+ }
+
+ test("error handling: insert/load/truncate table commands against a temp view") {
+ val viewName = "testView"
+ withTempView(viewName) {
+ sql(s"CREATE TEMPORARY VIEW $viewName AS SELECT id FROM jt")
+ var e = intercept[AnalysisException] {
+ sql(s"INSERT INTO TABLE $viewName SELECT 1")
+ }.getMessage
+ assert(e.contains("Inserting into an RDD-based table is not allowed"))
+
+ val testData = hiveContext.getHiveFile("data/files/employee.dat").getCanonicalPath
+ e = intercept[AnalysisException] {
+ sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE $viewName""")
+ }.getMessage
+ assert(e.contains(s"Target table in LOAD DATA cannot be temporary: `$viewName`"))
+
+ e = intercept[AnalysisException] {
+ sql(s"TRUNCATE TABLE $viewName")
+ }.getMessage
+ assert(e.contains(s"Operation not allowed: TRUNCATE TABLE on temporary tables: `$viewName`"))
+ }
+ }
+
+ test("error handling: insert/load/truncate table commands against a view") {
+ val viewName = "testView"
+ withView(viewName) {
+ sql(s"CREATE VIEW $viewName AS SELECT id FROM jt")
+ var e = intercept[AnalysisException] {
+ sql(s"INSERT INTO TABLE $viewName SELECT 1")
+ }.getMessage
+ assert(e.contains("Inserting into an RDD-based table is not allowed"))
+
+ val testData = hiveContext.getHiveFile("data/files/employee.dat").getCanonicalPath
+ e = intercept[AnalysisException] {
+ sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE $viewName""")
+ }.getMessage
+ assert(e.contains(s"Target table in LOAD DATA cannot be a view: `$viewName`"))
+
+ e = intercept[AnalysisException] {
+ sql(s"TRUNCATE TABLE $viewName")
+ }.getMessage
+ assert(e.contains(s"Operation not allowed: TRUNCATE TABLE on views: `$viewName`"))
+ }
+ }
+
test("error handling: fail if the view sql itself is invalid") {
// A table that does not exist
intercept[AnalysisException] {