diff options
author | gatorsmile <gatorsmile@gmail.com> | 2016-05-23 18:03:45 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2016-05-23 18:03:45 -0700 |
commit | 5afd927a47aa7ede3039234f2f7262e2247aa2ae (patch) | |
tree | d5bd14f5548aca7666ca389af83a6be5701e70b5 | |
parent | 01659bc50cd3d53815d205d005c3678e714c08e0 (diff) | |
download | spark-5afd927a47aa7ede3039234f2f7262e2247aa2ae.tar.gz spark-5afd927a47aa7ede3039234f2f7262e2247aa2ae.tar.bz2 spark-5afd927a47aa7ede3039234f2f7262e2247aa2ae.zip |
[SPARK-15311][SQL] Disallow DML on Regular Tables when Using In-Memory Catalog
#### What changes were proposed in this pull request?
So far, when using In-Memory Catalog, we allow DDL operations for the tables. However, the corresponding DML operations are not supported for the tables that are neither temporary nor data source tables. For example,
```SQL
CREATE TABLE tabName(i INT, j STRING)
SELECT * FROM tabName
INSERT OVERWRITE TABLE tabName SELECT 1, 'a'
```
In the above example, before this PR fix, we will get very confusing exception messages for either `SELECT` or `INSERT`
```
org.apache.spark.sql.AnalysisException: unresolved operator 'SimpleCatalogRelation default, CatalogTable(`default`.`tbl`,CatalogTableType(MANAGED),CatalogStorageFormat(None,Some(org.apache.hadoop.mapred.TextInputFormat),Some(org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat),None,false,Map()),List(CatalogColumn(i,int,true,None), CatalogColumn(j,string,true,None)),List(),List(),List(),-1,,1463928681802,-1,Map(),None,None,None,List()), None;
```
This PR is to issue appropriate exceptions in this case. The message will be like
```
org.apache.spark.sql.AnalysisException: Please enable Hive support when operating non-temporary tables: `tbl`;
```
#### How was this patch tested?
Added a test case in `DDLSuite`.
Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>
Closes #13093 from gatorsmile/selectAfterCreate.
3 files changed, 65 insertions, 1 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 28aa249888..cd242d78a4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.catalog.SimpleCatalogRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans.UsingJoin @@ -305,6 +306,20 @@ trait CheckAnalysis extends PredicateHelper { |Conflicting attributes: ${conflictingAttributes.mkString(",")} """.stripMargin) + case s: SimpleCatalogRelation => + failAnalysis( + s""" + |Please enable Hive support when selecting the regular tables: + |${s.catalogTable.identifier} + """.stripMargin) + + case InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _) => + failAnalysis( + s""" + |Please enable Hive support when inserting the regular tables: + |${s.catalogTable.identifier} + """.stripMargin) + case o if !o.resolved => failAnalysis( s"unresolved operator ${operator.simpleString}") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index b1b3e00de1..ca0096eeb2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -377,7 +377,8 @@ case class InsertIntoTable( } assert(overwrite || !ifNotExists) - override lazy val resolved: Boolean = childrenResolved && expectedColumns.forall { expected => + override lazy val resolved: Boolean = + childrenResolved && table.resolved && expectedColumns.forall { expected => child.output.size == expected.size && child.output.zip(expected).forall { case (childAttr, tableAttr) => DataType.equalsIgnoreCompatibleNullability(childAttr.dataType, tableAttr.dataType) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index d72dc092e2..64f5a4ac47 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -22,6 +22,7 @@ import java.io.File import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfterEach +import org.apache.spark.internal.config._ import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchPartitionException, NoSuchTableException} @@ -1044,6 +1045,53 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { ) } + test("select/insert into the managed table") { + assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory") + val tabName = "tbl" + withTable(tabName) { + sql(s"CREATE TABLE $tabName (i INT, j STRING)") + val catalogTable = + spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName, Some("default"))) + assert(catalogTable.tableType == CatalogTableType.MANAGED) + + var message = intercept[AnalysisException] { + sql(s"INSERT OVERWRITE TABLE $tabName SELECT 1, 'a'") + }.getMessage + assert(message.contains("Please enable Hive support when inserting the regular tables")) + message = intercept[AnalysisException] { + sql(s"SELECT * FROM $tabName") + }.getMessage + assert(message.contains("Please enable Hive support when selecting the regular tables")) + } + } + + test("select/insert into external table") { + assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory") + withTempDir { tempDir => + val tabName = "tbl" + withTable(tabName) { + sql( + s""" + |CREATE EXTERNAL TABLE $tabName (i INT, j STRING) + |ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' + |LOCATION '$tempDir' + """.stripMargin) + val catalogTable = + spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName, Some("default"))) + assert(catalogTable.tableType == CatalogTableType.EXTERNAL) + + var message = intercept[AnalysisException] { + sql(s"INSERT OVERWRITE TABLE $tabName SELECT 1, 'a'") + }.getMessage + assert(message.contains("Please enable Hive support when inserting the regular tables")) + message = intercept[AnalysisException] { + sql(s"SELECT * FROM $tabName") + }.getMessage + assert(message.contains("Please enable Hive support when selecting the regular tables")) + } + } + } + test("drop default database") { Seq("true", "false").foreach { caseSensitive => withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive) { |