aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-05-26 20:11:09 -0700
committerYin Huai <yhuai@databricks.com>2016-05-26 20:11:09 -0700
commit3fca635b4ed322208debcd89a539e42cdde6bbd4 (patch)
tree49f48c5c8cc7cef3c16fae44220adcdf319c65e8 /sql
parent6ab973ec51f3db72de1766ad8d8316b7a58ed0a0 (diff)
downloadspark-3fca635b4ed322208debcd89a539e42cdde6bbd4.tar.gz
spark-3fca635b4ed322208debcd89a539e42cdde6bbd4.tar.bz2
spark-3fca635b4ed322208debcd89a539e42cdde6bbd4.zip
[SPARK-15583][SQL] Disallow altering datasource properties
## What changes were proposed in this pull request? Certain table properties (and SerDe properties) are in the protected namespace `spark.sql.sources.`, which we use internally for datasource tables. The user should not be allowed to (1) Create a Hive table setting these properties (2) Alter these properties in an existing table Previously, we threw an exception if the user tried to alter the properties of an existing datasource table. However, this is overly restrictive for datasource tables and does not do anything for Hive tables. ## How was this patch tested? DDLSuite Author: Andrew Or <andrew@databricks.com> Closes #13341 from andrewor14/alter-table-props.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala17
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala37
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala148
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala2
5 files changed, 139 insertions, 67 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 6ca66a22df..deedb68a78 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -255,6 +255,23 @@ case class CreateDataSourceTableAsSelectCommand(
object CreateDataSourceTableUtils extends Logging {
+
+ // TODO: Actually replace usages with these variables (SPARK-15584)
+
+ val DATASOURCE_PREFIX = "spark.sql.sources."
+ val DATASOURCE_PROVIDER = DATASOURCE_PREFIX + "provider"
+ val DATASOURCE_WRITEJOBUUID = DATASOURCE_PREFIX + "writeJobUUID"
+ val DATASOURCE_OUTPUTPATH = DATASOURCE_PREFIX + "output.path"
+ val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_PREFIX + "schema."
+ val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts"
+ val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + "numPartCols"
+ val DATASOURCE_SCHEMA_NUMBUCKETS = DATASOURCE_SCHEMA_PREFIX + "numBuckets"
+ val DATASOURCE_SCHEMA_NUMBUCKETCOLS = DATASOURCE_SCHEMA_PREFIX + "numBucketCols"
+ val DATASOURCE_SCHEMA_PART_PREFIX = DATASOURCE_SCHEMA_PREFIX + "part."
+ val DATASOURCE_SCHEMA_PARTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "partCol."
+ val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol."
+ val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol."
+
/**
* Checks if the given name conforms the Hive standard ("[a-zA-z_0-9]+"),
* i.e. if this name only contains characters, numbers, and _.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 7ce7bb903d..15eba3b011 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, CatalogTableType, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils.DATASOURCE_PREFIX
import org.apache.spark.sql.execution.datasources.BucketSpec
import org.apache.spark.sql.types._
@@ -228,15 +229,13 @@ case class AlterTableSetPropertiesCommand(
extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
+ val ident = if (isView) "VIEW" else "TABLE"
val catalog = sparkSession.sessionState.catalog
DDLUtils.verifyAlterTableType(catalog, tableName, isView)
+ DDLUtils.verifyTableProperties(properties.keys.toSeq, s"ALTER $ident")
val table = catalog.getTableMetadata(tableName)
- val newProperties = table.properties ++ properties
- if (DDLUtils.isDatasourceTable(newProperties)) {
- throw new AnalysisException("ALTER TABLE SET TBLPROPERTIES is not supported for " +
- "tables defined using the datasource API")
- }
- val newTable = table.copy(properties = newProperties)
+ // This overrides old properties
+ val newTable = table.copy(properties = table.properties ++ properties)
catalog.alterTable(newTable)
Seq.empty[Row]
}
@@ -260,18 +259,16 @@ case class AlterTableUnsetPropertiesCommand(
extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
+ val ident = if (isView) "VIEW" else "TABLE"
val catalog = sparkSession.sessionState.catalog
DDLUtils.verifyAlterTableType(catalog, tableName, isView)
+ DDLUtils.verifyTableProperties(propKeys, s"ALTER $ident")
val table = catalog.getTableMetadata(tableName)
- if (DDLUtils.isDatasourceTable(table)) {
- throw new AnalysisException(
- "alter table properties is not supported for datasource tables")
- }
if (!ifExists) {
propKeys.foreach { k =>
if (!table.properties.contains(k)) {
throw new AnalysisException(
- s"attempted to unset non-existent property '$k' in table '$tableName'")
+ s"Attempted to unset non-existent property '$k' in table '$tableName'")
}
}
}
@@ -304,11 +301,15 @@ case class AlterTableSerDePropertiesCommand(
"ALTER TABLE attempted to set neither serde class name nor serde properties")
override def run(sparkSession: SparkSession): Seq[Row] = {
+ DDLUtils.verifyTableProperties(
+ serdeProperties.toSeq.flatMap(_.keys.toSeq),
+ "ALTER TABLE SERDEPROPERTIES")
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
// Do not support setting serde for datasource tables
if (serdeClassName.isDefined && DDLUtils.isDatasourceTable(table)) {
- throw new AnalysisException("ALTER TABLE SET SERDE is not supported for datasource tables")
+ throw new AnalysisException("Operation not allowed: ALTER TABLE SET SERDE is " +
+ "not supported for tables created with the datasource API")
}
val newTable = table.withNewStorage(
serde = serdeClassName.orElse(table.storage.serde),
@@ -489,6 +490,18 @@ object DDLUtils {
})
}
+ /**
+ * If the given table properties (or SerDe properties) contains datasource properties,
+ * throw an exception.
+ */
+ def verifyTableProperties(propKeys: Seq[String], operation: String): Unit = {
+ val datasourceKeys = propKeys.filter(_.startsWith(DATASOURCE_PREFIX))
+ if (datasourceKeys.nonEmpty) {
+ throw new AnalysisException(s"Operation not allowed: $operation property keys may not " +
+ s"start with '$DATASOURCE_PREFIX': ${datasourceKeys.mkString("[", ", ", "]")}")
+ }
+ }
+
def isTablePartitioned(table: CatalogTable): Boolean = {
table.partitionColumns.nonEmpty ||
table.properties.contains("spark.sql.sources.schema.numPartCols")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index e34beec33d..d1024090d3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -118,6 +118,8 @@ case class CreateTableLikeCommand(
case class CreateTableCommand(table: CatalogTable, ifNotExists: Boolean) extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
+ DDLUtils.verifyTableProperties(table.properties.keys.toSeq, "CREATE TABLE")
+ DDLUtils.verifyTableProperties(table.storage.serdeProperties.keys.toSeq, "CREATE TABLE")
sparkSession.sessionState.catalog.createTable(table, ifNotExists)
Seq.empty[Row]
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 6c038c7735..ff56749387 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFor
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils.DATASOURCE_PREFIX
import org.apache.spark.sql.execution.datasources.BucketSpec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
@@ -489,63 +490,19 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
test("alter table: set properties") {
- val catalog = spark.sessionState.catalog
- val tableIdent = TableIdentifier("tab1", Some("dbx"))
- createDatabase(catalog, "dbx")
- createTable(catalog, tableIdent)
- assert(catalog.getTableMetadata(tableIdent).properties.isEmpty)
- // set table properties
- sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('andrew' = 'or14', 'kor' = 'bel')")
- assert(catalog.getTableMetadata(tableIdent).properties ==
- Map("andrew" -> "or14", "kor" -> "bel"))
- // set table properties without explicitly specifying database
- catalog.setCurrentDatabase("dbx")
- sql("ALTER TABLE tab1 SET TBLPROPERTIES ('kor' = 'belle', 'kar' = 'bol')")
- assert(catalog.getTableMetadata(tableIdent).properties ==
- Map("andrew" -> "or14", "kor" -> "belle", "kar" -> "bol"))
- // table to alter does not exist
- intercept[AnalysisException] {
- sql("ALTER TABLE does_not_exist SET TBLPROPERTIES ('winner' = 'loser')")
- }
- // throw exception for datasource tables
- convertToDatasourceTable(catalog, tableIdent)
- val e = intercept[AnalysisException] {
- sql("ALTER TABLE tab1 SET TBLPROPERTIES ('sora' = 'bol')")
- }
- assert(e.getMessage.contains("datasource"))
+ testSetProperties(isDatasourceTable = false)
+ }
+
+ test("alter table: set properties (datasource table)") {
+ testSetProperties(isDatasourceTable = true)
}
test("alter table: unset properties") {
- val catalog = spark.sessionState.catalog
- val tableIdent = TableIdentifier("tab1", Some("dbx"))
- createDatabase(catalog, "dbx")
- createTable(catalog, tableIdent)
- // unset table properties
- sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('j' = 'am', 'p' = 'an', 'c' = 'lan')")
- sql("ALTER TABLE dbx.tab1 UNSET TBLPROPERTIES ('j')")
- assert(catalog.getTableMetadata(tableIdent).properties == Map("p" -> "an", "c" -> "lan"))
- // unset table properties without explicitly specifying database
- catalog.setCurrentDatabase("dbx")
- sql("ALTER TABLE tab1 UNSET TBLPROPERTIES ('p')")
- assert(catalog.getTableMetadata(tableIdent).properties == Map("c" -> "lan"))
- // table to alter does not exist
- intercept[AnalysisException] {
- sql("ALTER TABLE does_not_exist UNSET TBLPROPERTIES ('c' = 'lan')")
- }
- // property to unset does not exist
- val e = intercept[AnalysisException] {
- sql("ALTER TABLE tab1 UNSET TBLPROPERTIES ('c', 'xyz')")
- }
- assert(e.getMessage.contains("xyz"))
- // property to unset does not exist, but "IF EXISTS" is specified
- sql("ALTER TABLE tab1 UNSET TBLPROPERTIES IF EXISTS ('c', 'xyz')")
- assert(catalog.getTableMetadata(tableIdent).properties.isEmpty)
- // throw exception for datasource tables
- convertToDatasourceTable(catalog, tableIdent)
- val e1 = intercept[AnalysisException] {
- sql("ALTER TABLE tab1 UNSET TBLPROPERTIES ('sora')")
- }
- assert(e1.getMessage.contains("datasource"))
+ testUnsetProperties(isDatasourceTable = false)
+ }
+
+ test("alter table: unset properties (datasource table)") {
+ testUnsetProperties(isDatasourceTable = true)
}
test("alter table: set serde") {
@@ -768,6 +725,78 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
properties = Map("spark.sql.sources.provider" -> "csv")))
}
+ private def testSetProperties(isDatasourceTable: Boolean): Unit = {
+ val catalog = spark.sessionState.catalog
+ val tableIdent = TableIdentifier("tab1", Some("dbx"))
+ createDatabase(catalog, "dbx")
+ createTable(catalog, tableIdent)
+ if (isDatasourceTable) {
+ convertToDatasourceTable(catalog, tableIdent)
+ }
+ def getProps: Map[String, String] = {
+ catalog.getTableMetadata(tableIdent).properties.filterKeys { k =>
+ !isDatasourceTable || !k.startsWith(DATASOURCE_PREFIX)
+ }
+ }
+ assert(getProps.isEmpty)
+ // set table properties
+ sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('andrew' = 'or14', 'kor' = 'bel')")
+ assert(getProps == Map("andrew" -> "or14", "kor" -> "bel"))
+ // set table properties without explicitly specifying database
+ catalog.setCurrentDatabase("dbx")
+ sql("ALTER TABLE tab1 SET TBLPROPERTIES ('kor' = 'belle', 'kar' = 'bol')")
+ assert(getProps == Map("andrew" -> "or14", "kor" -> "belle", "kar" -> "bol"))
+ // table to alter does not exist
+ intercept[AnalysisException] {
+ sql("ALTER TABLE does_not_exist SET TBLPROPERTIES ('winner' = 'loser')")
+ }
+ // datasource table property keys are not allowed
+ val e = intercept[AnalysisException] {
+ sql(s"ALTER TABLE tab1 SET TBLPROPERTIES ('${DATASOURCE_PREFIX}foo' = 'loser')")
+ }
+ assert(e.getMessage.contains(DATASOURCE_PREFIX + "foo"))
+ }
+
+ private def testUnsetProperties(isDatasourceTable: Boolean): Unit = {
+ val catalog = spark.sessionState.catalog
+ val tableIdent = TableIdentifier("tab1", Some("dbx"))
+ createDatabase(catalog, "dbx")
+ createTable(catalog, tableIdent)
+ if (isDatasourceTable) {
+ convertToDatasourceTable(catalog, tableIdent)
+ }
+ def getProps: Map[String, String] = {
+ catalog.getTableMetadata(tableIdent).properties.filterKeys { k =>
+ !isDatasourceTable || !k.startsWith(DATASOURCE_PREFIX)
+ }
+ }
+ // unset table properties
+ sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('j' = 'am', 'p' = 'an', 'c' = 'lan', 'x' = 'y')")
+ sql("ALTER TABLE dbx.tab1 UNSET TBLPROPERTIES ('j')")
+ assert(getProps == Map("p" -> "an", "c" -> "lan", "x" -> "y"))
+ // unset table properties without explicitly specifying database
+ catalog.setCurrentDatabase("dbx")
+ sql("ALTER TABLE tab1 UNSET TBLPROPERTIES ('p')")
+ assert(getProps == Map("c" -> "lan", "x" -> "y"))
+ // table to alter does not exist
+ intercept[AnalysisException] {
+ sql("ALTER TABLE does_not_exist UNSET TBLPROPERTIES ('c' = 'lan')")
+ }
+ // property to unset does not exist
+ val e = intercept[AnalysisException] {
+ sql("ALTER TABLE tab1 UNSET TBLPROPERTIES ('c', 'xyz')")
+ }
+ assert(e.getMessage.contains("xyz"))
+ // property to unset does not exist, but "IF EXISTS" is specified
+ sql("ALTER TABLE tab1 UNSET TBLPROPERTIES IF EXISTS ('c', 'xyz')")
+ assert(getProps == Map("x" -> "y"))
+ // datasource table property keys are not allowed
+ val e2 = intercept[AnalysisException] {
+ sql(s"ALTER TABLE tab1 UNSET TBLPROPERTIES ('${DATASOURCE_PREFIX}foo')")
+ }
+ assert(e2.getMessage.contains(DATASOURCE_PREFIX + "foo"))
+ }
+
private def testSetLocation(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
@@ -870,6 +899,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
intercept[AnalysisException] {
sql("ALTER TABLE does_not_exist SET SERDEPROPERTIES ('x' = 'y')")
}
+ // serde properties must not be a datasource property
+ val e = intercept[AnalysisException] {
+ sql(s"ALTER TABLE tab1 SET SERDEPROPERTIES ('${DATASOURCE_PREFIX}foo'='wah')")
+ }
+ assert(e.getMessage.contains(DATASOURCE_PREFIX + "foo"))
}
private def testAddPartitions(isDatasourceTable: Boolean): Unit = {
@@ -1091,6 +1125,12 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
+ test("create table with datasource properties (not allowed)") {
+ assertUnsupported("CREATE TABLE my_tab TBLPROPERTIES ('spark.sql.sources.me'='anything')")
+ assertUnsupported("CREATE TABLE my_tab ROW FORMAT SERDE 'serde' " +
+ "WITH SERDEPROPERTIES ('spark.sql.sources.me'='anything')")
+ }
+
test("drop default database") {
Seq("true", "false").foreach { caseSensitive =>
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive) {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index e2cef38556..80e6f4ec70 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -287,7 +287,7 @@ class HiveDDLSuite
sql(s"ALTER VIEW $viewName UNSET TBLPROPERTIES ('p')")
}.getMessage
assert(message.contains(
- "attempted to unset non-existent property 'p' in table '`view1`'"))
+ "Attempted to unset non-existent property 'p' in table '`view1`'"))
}
}
}